Remove std::lock_guard
[WebKit-https.git] / Source / WebKit / Platform / IPC / Connection.cpp
1 /*
2  * Copyright (C) 2010-2016 Apple Inc. All rights reserved.
3  *
4  * Redistribution and use in source and binary forms, with or without
5  * modification, are permitted provided that the following conditions
6  * are met:
7  * 1. Redistributions of source code must retain the above copyright
8  *    notice, this list of conditions and the following disclaimer.
9  * 2. Redistributions in binary form must reproduce the above copyright
10  *    notice, this list of conditions and the following disclaimer in the
11  *    documentation and/or other materials provided with the distribution.
12  *
13  * THIS SOFTWARE IS PROVIDED BY APPLE INC. AND ITS CONTRIBUTORS ``AS IS''
14  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
15  * THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
16  * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL APPLE INC. OR ITS CONTRIBUTORS
17  * BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
18  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
19  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
20  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
21  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
22  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF
23  * THE POSSIBILITY OF SUCH DAMAGE.
24  */
25
26 #include "config.h"
27 #include "Connection.h"
28
29 #include "Logging.h"
30 #include "MessageFlags.h"
31 #include <memory>
32 #include <wtf/HashSet.h>
33 #include <wtf/Lock.h>
34 #include <wtf/NeverDestroyed.h>
35 #include <wtf/RunLoop.h>
36 #include <wtf/text/WTFString.h>
37 #include <wtf/threads/BinarySemaphore.h>
38
39 #if PLATFORM(COCOA)
40 #include "MachMessage.h"
41 #endif
42
43 #if USE(UNIX_DOMAIN_SOCKETS)
44 #include "UnixMessage.h"
45 #endif
46
47 namespace IPC {
48
49 #if PLATFORM(COCOA)
50 // The IPC connection gets killed if the incoming message queue reaches 50000 messages before the main thread has a chance to dispatch them.
51 const size_t maxPendingIncomingMessagesKillingThreshold { 50000 };
52 #endif
53
54 std::atomic<unsigned> UnboundedSynchronousIPCScope::unboundedSynchronousIPCCount = 0;
55
56 struct Connection::WaitForMessageState {
57     WaitForMessageState(StringReference messageReceiverName, StringReference messageName, uint64_t destinationID, OptionSet<WaitForOption> waitForOptions)
58         : messageReceiverName(messageReceiverName)
59         , messageName(messageName)
60         , destinationID(destinationID)
61         , waitForOptions(waitForOptions)
62     {
63     }
64
65     StringReference messageReceiverName;
66     StringReference messageName;
67     uint64_t destinationID;
68
69     OptionSet<WaitForOption> waitForOptions;
70     bool messageWaitingInterrupted = false;
71
72     std::unique_ptr<Decoder> decoder;
73 };
74
75 class Connection::SyncMessageState {
76 public:
77     static SyncMessageState& singleton();
78
79     ~SyncMessageState() = delete;
80
81     void wakeUpClientRunLoop()
82     {
83         m_waitForSyncReplySemaphore.signal();
84     }
85
86     bool wait(TimeWithDynamicClockType absoluteTime)
87     {
88         return m_waitForSyncReplySemaphore.waitUntil(absoluteTime);
89     }
90
91     // Returns true if this message will be handled on a client thread that is currently
92     // waiting for a reply to a synchronous message.
93     bool processIncomingMessage(Connection&, std::unique_ptr<Decoder>&);
94
95     // Dispatch pending sync messages.
96     void dispatchMessages();
97
98 private:
99     friend class LazyNeverDestroyed<Connection::SyncMessageState>;
100     SyncMessageState() = default;
101
102     // Dispatch pending sync messages for given connection.
103     void dispatchMessagesAndResetDidScheduleDispatchMessagesForConnection(Connection&);
104
105     BinarySemaphore m_waitForSyncReplySemaphore;
106
107     // Protects m_didScheduleDispatchMessagesWorkSet and m_messagesToDispatchWhileWaitingForSyncReply.
108     Lock m_mutex;
109
110     // The set of connections for which we've scheduled a call to dispatchMessageAndResetDidScheduleDispatchMessagesForConnection.
111     HashSet<RefPtr<Connection>> m_didScheduleDispatchMessagesWorkSet;
112
113     struct ConnectionAndIncomingMessage {
114         Ref<Connection> connection;
115         std::unique_ptr<Decoder> message;
116
117         void dispatch()
118         {
119             connection->dispatchMessage(WTFMove(message));
120         }
121     };
122     Vector<ConnectionAndIncomingMessage> m_messagesToDispatchWhileWaitingForSyncReply;
123 };
124
125 Connection::SyncMessageState& Connection::SyncMessageState::singleton()
126 {
127     static std::once_flag onceFlag;
128     static LazyNeverDestroyed<SyncMessageState> syncMessageState;
129
130     std::call_once(onceFlag, [] {
131         syncMessageState.construct();
132     });
133
134     return syncMessageState;
135 }
136
137 bool Connection::SyncMessageState::processIncomingMessage(Connection& connection, std::unique_ptr<Decoder>& message)
138 {
139     switch (message->shouldDispatchMessageWhenWaitingForSyncReply()) {
140     case ShouldDispatchWhenWaitingForSyncReply::No:
141         return false;
142     case ShouldDispatchWhenWaitingForSyncReply::YesDuringUnboundedIPC:
143         if (!UnboundedSynchronousIPCScope::hasOngoingUnboundedSyncIPC())
144             return false;
145         break;
146     case ShouldDispatchWhenWaitingForSyncReply::Yes:
147         break;
148     }
149
150     bool shouldDispatch;
151     {
152         auto locker = holdLock(m_mutex);
153         shouldDispatch = m_didScheduleDispatchMessagesWorkSet.add(&connection).isNewEntry;
154         m_messagesToDispatchWhileWaitingForSyncReply.append(ConnectionAndIncomingMessage { connection, WTFMove(message) });
155     }
156
157     if (shouldDispatch) {
158         RunLoop::main().dispatch([this, protectedConnection = makeRef(connection)]() mutable {
159             dispatchMessagesAndResetDidScheduleDispatchMessagesForConnection(protectedConnection);
160         });
161     }
162
163     wakeUpClientRunLoop();
164
165     return true;
166 }
167
168 void Connection::SyncMessageState::dispatchMessages()
169 {
170     ASSERT(RunLoop::isMain());
171
172     Vector<ConnectionAndIncomingMessage> messagesToDispatchWhileWaitingForSyncReply;
173     {
174         auto locker = holdLock(m_mutex);
175         m_messagesToDispatchWhileWaitingForSyncReply.swap(messagesToDispatchWhileWaitingForSyncReply);
176     }
177
178     for (auto& connectionAndIncomingMessage : messagesToDispatchWhileWaitingForSyncReply)
179         connectionAndIncomingMessage.dispatch();
180 }
181
182 void Connection::SyncMessageState::dispatchMessagesAndResetDidScheduleDispatchMessagesForConnection(Connection& connection)
183 {
184     ASSERT(RunLoop::isMain());
185
186     Vector<ConnectionAndIncomingMessage> messagesToDispatchWhileWaitingForSyncReply;
187     {
188         auto locker = holdLock(m_mutex);
189         ASSERT(m_didScheduleDispatchMessagesWorkSet.contains(&connection));
190         m_didScheduleDispatchMessagesWorkSet.remove(&connection);
191         m_messagesToDispatchWhileWaitingForSyncReply.swap(messagesToDispatchWhileWaitingForSyncReply);
192     }
193
194     Vector<ConnectionAndIncomingMessage> messagesToPutBack;
195     for (auto& connectionAndIncomingMessage : messagesToDispatchWhileWaitingForSyncReply) {
196         if (&connection == connectionAndIncomingMessage.connection.ptr())
197             connectionAndIncomingMessage.dispatch();
198         else
199             messagesToPutBack.append(WTFMove(connectionAndIncomingMessage));
200     }
201
202     if (!messagesToPutBack.isEmpty()) {
203         auto locker = holdLock(m_mutex);
204         messagesToPutBack.appendVector(WTFMove(m_messagesToDispatchWhileWaitingForSyncReply));
205         m_messagesToDispatchWhileWaitingForSyncReply = WTFMove(messagesToPutBack);
206     }
207 }
208
209 // Represents a sync request for which we're waiting on a reply.
210 struct Connection::PendingSyncReply {
211     // The request ID.
212     uint64_t syncRequestID { 0 };
213
214     // The reply decoder, will be null if there was an error processing the sync
215     // message on the other side.
216     std::unique_ptr<Decoder> replyDecoder;
217
218     // Will be set to true once a reply has been received.
219     bool didReceiveReply { false };
220
221     PendingSyncReply() = default;
222
223     explicit PendingSyncReply(uint64_t syncRequestID)
224         : syncRequestID(syncRequestID)
225     {
226     }
227 };
228
229 Ref<Connection> Connection::createServerConnection(Identifier identifier, Client& client)
230 {
231     return adoptRef(*new Connection(identifier, true, client));
232 }
233
234 Ref<Connection> Connection::createClientConnection(Identifier identifier, Client& client)
235 {
236     return adoptRef(*new Connection(identifier, false, client));
237 }
238
239 static HashMap<IPC::Connection::UniqueID, Connection*>& allConnections()
240 {
241     static NeverDestroyed<HashMap<IPC::Connection::UniqueID, Connection*>> map;
242     return map;
243 }
244
245 static Lock& asyncReplyHandlerMapLock()
246 {
247     static Lock lock;
248     return lock;
249 }
250
251 static HashMap<uintptr_t, HashMap<uint64_t, CompletionHandler<void(Decoder*)>>>& asyncReplyHandlerMap(const LockHolder&)
252 {
253     ASSERT(asyncReplyHandlerMapLock().isHeld());
254     static NeverDestroyed<HashMap<uintptr_t, HashMap<uint64_t, CompletionHandler<void(Decoder*)>>>> map;
255     return map.get();
256 }
257
258 static void clearAsyncReplyHandlers(const Connection&);
259
260 Connection::Connection(Identifier identifier, bool isServer, Client& client)
261     : m_client(client)
262     , m_uniqueID(UniqueID::generate())
263     , m_isServer(isServer)
264     , m_syncRequestID(0)
265     , m_onlySendMessagesAsDispatchWhenWaitingForSyncReplyWhenProcessingSuchAMessage(false)
266     , m_shouldExitOnSyncMessageSendFailure(false)
267     , m_didCloseOnConnectionWorkQueueCallback(0)
268     , m_isConnected(false)
269     , m_connectionQueue(WorkQueue::create("com.apple.IPC.ReceiveQueue"))
270     , m_inSendSyncCount(0)
271     , m_inDispatchMessageCount(0)
272     , m_inDispatchMessageMarkedDispatchWhenWaitingForSyncReplyCount(0)
273     , m_didReceiveInvalidMessage(false)
274     , m_shouldWaitForSyncReplies(true)
275     , m_shouldWaitForMessages(true)
276 {
277     ASSERT(RunLoop::isMain());
278     allConnections().add(m_uniqueID, this);
279
280     platformInitialize(identifier);
281
282 #if HAVE(QOS_CLASSES)
283     ASSERT(pthread_main_np());
284     m_mainThread = pthread_self();
285 #endif
286 }
287
288 Connection::~Connection()
289 {
290     ASSERT(RunLoop::isMain());
291     ASSERT(!isValid());
292
293     allConnections().remove(m_uniqueID);
294
295     clearAsyncReplyHandlers(*this);
296 }
297
298 Connection* Connection::connection(UniqueID uniqueID)
299 {
300     ASSERT(RunLoop::isMain());
301     return allConnections().get(uniqueID);
302 }
303
304 void Connection::setOnlySendMessagesAsDispatchWhenWaitingForSyncReplyWhenProcessingSuchAMessage(bool flag)
305 {
306     ASSERT(!m_isConnected);
307
308     m_onlySendMessagesAsDispatchWhenWaitingForSyncReplyWhenProcessingSuchAMessage = flag;
309 }
310
311 void Connection::setShouldExitOnSyncMessageSendFailure(bool shouldExitOnSyncMessageSendFailure)
312 {
313     ASSERT(!m_isConnected);
314
315     m_shouldExitOnSyncMessageSendFailure = shouldExitOnSyncMessageSendFailure;
316 }
317
318 void Connection::addWorkQueueMessageReceiver(StringReference messageReceiverName, WorkQueue& workQueue, WorkQueueMessageReceiver* workQueueMessageReceiver)
319 {
320     ASSERT(RunLoop::isMain());
321
322     auto locker = holdLock(m_workQueueMessageReceiversMutex);
323     ASSERT(!m_workQueueMessageReceivers.contains(messageReceiverName));
324
325     m_workQueueMessageReceivers.add(messageReceiverName, std::make_pair(&workQueue, workQueueMessageReceiver));
326 }
327
328 void Connection::removeWorkQueueMessageReceiver(StringReference messageReceiverName)
329 {
330     ASSERT(RunLoop::isMain());
331
332     auto locker = holdLock(m_workQueueMessageReceiversMutex);
333     ASSERT(m_workQueueMessageReceivers.contains(messageReceiverName));
334     m_workQueueMessageReceivers.remove(messageReceiverName);
335 }
336
337 void Connection::dispatchWorkQueueMessageReceiverMessage(WorkQueueMessageReceiver& workQueueMessageReceiver, Decoder& decoder)
338 {
339     if (!decoder.isSyncMessage()) {
340         workQueueMessageReceiver.didReceiveMessage(*this, decoder);
341         return;
342     }
343
344     uint64_t syncRequestID = 0;
345     if (!decoder.decode(syncRequestID) || !syncRequestID) {
346         // We received an invalid sync message.
347         // FIXME: Handle this.
348         decoder.markInvalid();
349         return;
350     }
351
352     auto replyEncoder = makeUnique<Encoder>("IPC", "SyncMessageReply", syncRequestID);
353
354     // Hand off both the decoder and encoder to the work queue message receiver.
355     workQueueMessageReceiver.didReceiveSyncMessage(*this, decoder, replyEncoder);
356
357     // FIXME: If the message was invalid, we should send back a SyncMessageError.
358     ASSERT(!decoder.isInvalid());
359
360     if (replyEncoder)
361         sendSyncReply(WTFMove(replyEncoder));
362 }
363
364 void Connection::addThreadMessageReceiver(StringReference messageReceiverName, ThreadMessageReceiver* threadMessageReceiver)
365 {
366     ASSERT(RunLoop::isMain());
367
368     auto locker = holdLock(m_threadMessageReceiversLock);
369     ASSERT(!m_threadMessageReceivers.contains(messageReceiverName));
370
371     m_threadMessageReceivers.add(messageReceiverName, threadMessageReceiver);
372 }
373
374 void Connection::removeThreadMessageReceiver(StringReference messageReceiverName)
375 {
376     ASSERT(RunLoop::isMain());
377
378     auto locker = holdLock(m_threadMessageReceiversLock);
379     ASSERT(m_threadMessageReceivers.contains(messageReceiverName));
380
381     m_threadMessageReceivers.remove(messageReceiverName);
382 }
383
384 void Connection::dispatchThreadMessageReceiverMessage(ThreadMessageReceiver& threadMessageReceiver, Decoder& decoder)
385 {
386     if (!decoder.isSyncMessage()) {
387         threadMessageReceiver.didReceiveMessage(*this, decoder);
388         return;
389     }
390
391     uint64_t syncRequestID = 0;
392     if (!decoder.decode(syncRequestID) || !syncRequestID) {
393         // FIXME: Handle invalid sync message.
394         decoder.markInvalid();
395         return;
396     }
397
398     auto replyEncoder = makeUnique<Encoder>("IPC", "SyncMessageReply", syncRequestID);
399     threadMessageReceiver.didReceiveSyncMessage(*this, decoder, replyEncoder);
400
401     // FIXME: If the message was invalid, we should send back a SyncMessageError.
402     ASSERT(!decoder.isInvalid());
403
404     if (replyEncoder)
405         sendSyncReply(WTFMove(replyEncoder));
406 }
407
408 void Connection::setDidCloseOnConnectionWorkQueueCallback(DidCloseOnConnectionWorkQueueCallback callback)
409 {
410     ASSERT(!m_isConnected);
411
412     m_didCloseOnConnectionWorkQueueCallback = callback;    
413 }
414
415 void Connection::invalidate()
416 {
417     ASSERT(RunLoop::isMain());
418
419     if (!isValid()) {
420         // Someone already called invalidate().
421         return;
422     }
423     
424     m_isValid = false;
425
426     m_connectionQueue->dispatch([protectedThis = makeRef(*this)]() mutable {
427         protectedThis->platformInvalidate();
428     });
429 }
430
431 void Connection::markCurrentlyDispatchedMessageAsInvalid()
432 {
433     // This should only be called while processing a message.
434     ASSERT(m_inDispatchMessageCount > 0);
435
436     m_didReceiveInvalidMessage = true;
437 }
438
439 std::unique_ptr<Encoder> Connection::createSyncMessageEncoder(StringReference messageReceiverName, StringReference messageName, uint64_t destinationID, uint64_t& syncRequestID)
440 {
441     auto encoder = makeUnique<Encoder>(messageReceiverName, messageName, destinationID);
442     encoder->setIsSyncMessage(true);
443
444     // Encode the sync request ID.
445     syncRequestID = ++m_syncRequestID;
446     *encoder << syncRequestID;
447
448     return encoder;
449 }
450
451 bool Connection::sendMessage(std::unique_ptr<Encoder> encoder, OptionSet<SendOption> sendOptions)
452 {
453     if (!isValid())
454         return false;
455
456     if (isMainThread() && m_inDispatchMessageMarkedToUseFullySynchronousModeForTesting && !encoder->isSyncMessage() && !(encoder->messageReceiverName() == "IPC") && !sendOptions.contains(SendOption::IgnoreFullySynchronousMode)) {
457         uint64_t syncRequestID;
458         auto wrappedMessage = createSyncMessageEncoder("IPC", "WrappedAsyncMessageForTesting", encoder->destinationID(), syncRequestID);
459         wrappedMessage->setFullySynchronousModeForTesting();
460         wrappedMessage->wrapForTesting(WTFMove(encoder));
461         return static_cast<bool>(sendSyncMessage(syncRequestID, WTFMove(wrappedMessage), Seconds::infinity(), { }));
462     }
463
464     if (sendOptions.contains(SendOption::DispatchMessageEvenWhenWaitingForSyncReply)
465         && (!m_onlySendMessagesAsDispatchWhenWaitingForSyncReplyWhenProcessingSuchAMessage
466             || m_inDispatchMessageMarkedDispatchWhenWaitingForSyncReplyCount))
467         encoder->setShouldDispatchMessageWhenWaitingForSyncReply(ShouldDispatchWhenWaitingForSyncReply::Yes);
468     else if (sendOptions.contains(SendOption::DispatchMessageEvenWhenWaitingForUnboundedSyncReply))
469         encoder->setShouldDispatchMessageWhenWaitingForSyncReply(ShouldDispatchWhenWaitingForSyncReply::YesDuringUnboundedIPC);
470
471     {
472         auto locker = holdLock(m_outgoingMessagesMutex);
473         m_outgoingMessages.append(WTFMove(encoder));
474     }
475     
476     // FIXME: We should add a boolean flag so we don't call this when work has already been scheduled.
477     m_connectionQueue->dispatch([protectedThis = makeRef(*this)]() mutable {
478         protectedThis->sendOutgoingMessages();
479     });
480     return true;
481 }
482
483 bool Connection::sendSyncReply(std::unique_ptr<Encoder> encoder)
484 {
485     return sendMessage(WTFMove(encoder), { });
486 }
487
488 Seconds Connection::timeoutRespectingIgnoreTimeoutsForTesting(Seconds timeout) const
489 {
490     return m_ignoreTimeoutsForTesting ? Seconds::infinity() : timeout;
491 }
492
493 std::unique_ptr<Decoder> Connection::waitForMessage(StringReference messageReceiverName, StringReference messageName, uint64_t destinationID, Seconds timeout, OptionSet<WaitForOption> waitForOptions)
494 {
495     ASSERT(RunLoop::isMain());
496     auto protectedThis = makeRef(*this);
497
498     timeout = timeoutRespectingIgnoreTimeoutsForTesting(timeout);
499
500     bool hasIncomingSynchronousMessage = false;
501
502     // First, check if this message is already in the incoming messages queue.
503     {
504         auto locker = holdLock(m_incomingMessagesMutex);
505
506         for (auto it = m_incomingMessages.begin(), end = m_incomingMessages.end(); it != end; ++it) {
507             std::unique_ptr<Decoder>& message = *it;
508
509             if (message->messageReceiverName() == messageReceiverName && message->messageName() == messageName && message->destinationID() == destinationID) {
510                 std::unique_ptr<Decoder> returnedMessage = WTFMove(message);
511
512                 m_incomingMessages.remove(it);
513                 return returnedMessage;
514             }
515
516             if (message->isSyncMessage())
517                 hasIncomingSynchronousMessage = true;
518         }
519     }
520
521     // Don't even start waiting if we have InterruptWaitingIfSyncMessageArrives and there's a sync message already in the queue.
522     if (hasIncomingSynchronousMessage && waitForOptions.contains(WaitForOption::InterruptWaitingIfSyncMessageArrives)) {
523 #if ASSERT_ENABLED
524         auto locker = holdLock(m_waitForMessageMutex);
525         // We don't support having multiple clients waiting for messages.
526         ASSERT(!m_waitingForMessage);
527 #endif
528         return nullptr;
529     }
530
531     WaitForMessageState waitingForMessage(messageReceiverName, messageName, destinationID, waitForOptions);
532
533     {
534         auto locker = holdLock(m_waitForMessageMutex);
535
536         // We don't support having multiple clients waiting for messages.
537         ASSERT(!m_waitingForMessage);
538         if (m_waitingForMessage)
539             return nullptr;
540
541         // If the connection is already invalidated, don't even start waiting.
542         // Once m_waitingForMessage is set, messageWaitingInterrupted will cover this instead.
543         if (!m_shouldWaitForMessages)
544             return nullptr;
545
546         m_waitingForMessage = &waitingForMessage;
547     }
548
549     MonotonicTime absoluteTimeout = MonotonicTime::now() + timeout;
550
551     // Now wait for it to be set.
552     while (true) {
553         // Handle any messages that are blocked on a response from us.
554         SyncMessageState::singleton().dispatchMessages();
555
556         std::unique_lock<Lock> lock(m_waitForMessageMutex);
557
558         if (m_waitingForMessage->decoder) {
559             auto decoder = WTFMove(m_waitingForMessage->decoder);
560             m_waitingForMessage = nullptr;
561             return decoder;
562         }
563
564         // Now we wait.
565         bool didTimeout = !m_waitForMessageCondition.waitUntil(lock, absoluteTimeout);
566         // We timed out, lost our connection, or a sync message came in with InterruptWaitingIfSyncMessageArrives, so stop waiting.
567         if (didTimeout || m_waitingForMessage->messageWaitingInterrupted) {
568             m_waitingForMessage = nullptr;
569             break;
570         }
571     }
572
573     return nullptr;
574 }
575
576 std::unique_ptr<Decoder> Connection::sendSyncMessage(uint64_t syncRequestID, std::unique_ptr<Encoder> encoder, Seconds timeout, OptionSet<SendSyncOption> sendSyncOptions)
577 {
578     ASSERT(RunLoop::isMain());
579
580     if (!isValid()) {
581         didFailToSendSyncMessage();
582         return nullptr;
583     }
584
585     // Push the pending sync reply information on our stack.
586     {
587         LockHolder locker(m_syncReplyStateMutex);
588         if (!m_shouldWaitForSyncReplies) {
589             didFailToSendSyncMessage();
590             return nullptr;
591         }
592
593         m_pendingSyncReplies.append(PendingSyncReply(syncRequestID));
594     }
595
596     ++m_inSendSyncCount;
597
598     // First send the message.
599     OptionSet<SendOption> sendOptions = IPC::SendOption::DispatchMessageEvenWhenWaitingForSyncReply;
600     if (sendSyncOptions.contains(SendSyncOption::ForceDispatchWhenDestinationIsWaitingForUnboundedSyncReply))
601         sendOptions = sendOptions | IPC::SendOption::DispatchMessageEvenWhenWaitingForUnboundedSyncReply;
602
603     auto messageReceiverName = encoder->messageReceiverName();
604     auto messageName = encoder->messageName();
605     sendMessage(WTFMove(encoder), sendOptions);
606
607     // Then wait for a reply. Waiting for a reply could involve dispatching incoming sync messages, so
608     // keep an extra reference to the connection here in case it's invalidated.
609     Ref<Connection> protect(*this);
610     std::unique_ptr<Decoder> reply = waitForSyncReply(syncRequestID, messageReceiverName, messageName, timeout, sendSyncOptions);
611
612     --m_inSendSyncCount;
613
614     // Finally, pop the pending sync reply information.
615     {
616         LockHolder locker(m_syncReplyStateMutex);
617         ASSERT(m_pendingSyncReplies.last().syncRequestID == syncRequestID);
618         m_pendingSyncReplies.removeLast();
619     }
620
621     if (!reply)
622         didFailToSendSyncMessage();
623
624     return reply;
625 }
626
627 std::unique_ptr<Decoder> Connection::waitForSyncReply(uint64_t syncRequestID, StringReference messageReceiverName, StringReference messageName, Seconds timeout, OptionSet<SendSyncOption> sendSyncOptions)
628 {
629     timeout = timeoutRespectingIgnoreTimeoutsForTesting(timeout);
630     MonotonicTime absoluteTime = MonotonicTime::now() + timeout;
631
632     willSendSyncMessage(sendSyncOptions);
633     
634     bool timedOut = false;
635     while (!timedOut) {
636         // First, check if we have any messages that we need to process.
637         SyncMessageState::singleton().dispatchMessages();
638         
639         {
640             LockHolder locker(m_syncReplyStateMutex);
641
642             // Second, check if there is a sync reply at the top of the stack.
643             ASSERT(!m_pendingSyncReplies.isEmpty());
644             
645             PendingSyncReply& pendingSyncReply = m_pendingSyncReplies.last();
646             ASSERT_UNUSED(syncRequestID, pendingSyncReply.syncRequestID == syncRequestID);
647             
648             // We found the sync reply, or the connection was closed.
649             if (pendingSyncReply.didReceiveReply || !m_shouldWaitForSyncReplies) {
650                 didReceiveSyncReply(sendSyncOptions);
651                 return WTFMove(pendingSyncReply.replyDecoder);
652             }
653         }
654
655         // Processing a sync message could cause the connection to be invalidated.
656         // (If the handler ends up calling Connection::invalidate).
657         // If that happens, we need to stop waiting, or we'll hang since we won't get
658         // any more incoming messages.
659         if (!isValid()) {
660             RELEASE_LOG_ERROR(IPC, "Connection::waitForSyncReply: Connection no longer valid, id = %" PRIu64, syncRequestID);
661             didReceiveSyncReply(sendSyncOptions);
662             return nullptr;
663         }
664
665         // We didn't find a sync reply yet, keep waiting.
666         // This allows the WebProcess to still serve clients while waiting for the message to return.
667         // Notably, it can continue to process accessibility requests, which are on the main thread.
668         timedOut = !SyncMessageState::singleton().wait(absoluteTime);
669     }
670
671 #if OS(DARWIN)
672     RELEASE_LOG_ERROR(IPC, "Connection::waitForSyncReply: Timed-out while waiting for reply for %{public}s::%{public}s from process %d, id = %" PRIu64, messageReceiverName.toString().data(), messageName.toString().data(), remoteProcessID(), syncRequestID);
673 #else
674     RELEASE_LOG_ERROR(IPC, "Connection::waitForSyncReply: Timed-out while waiting for reply for %s::%s, id = %" PRIu64, messageReceiverName.toString().data(), messageName.toString().data(), syncRequestID);
675 #endif
676
677     didReceiveSyncReply(sendSyncOptions);
678
679     return nullptr;
680 }
681
682 void Connection::processIncomingSyncReply(std::unique_ptr<Decoder> decoder)
683 {
684     {
685         LockHolder locker(m_syncReplyStateMutex);
686
687         // Go through the stack of sync requests that have pending replies and see which one
688         // this reply is for.
689         for (size_t i = m_pendingSyncReplies.size(); i > 0; --i) {
690             PendingSyncReply& pendingSyncReply = m_pendingSyncReplies[i - 1];
691
692             if (pendingSyncReply.syncRequestID != decoder->destinationID())
693                 continue;
694
695             ASSERT(!pendingSyncReply.replyDecoder);
696
697             pendingSyncReply.replyDecoder = WTFMove(decoder);
698             pendingSyncReply.didReceiveReply = true;
699
700             // We got a reply to the last send message, wake up the client run loop so it can be processed.
701             if (i == m_pendingSyncReplies.size())
702                 SyncMessageState::singleton().wakeUpClientRunLoop();
703
704             return;
705         }
706     }
707
708     // If we get here, it means we got a reply for a message that wasn't in the sync request stack or map.
709     // This can happen if the send timed out, so it's fine to ignore.
710 }
711
712 void Connection::processIncomingMessage(std::unique_ptr<Decoder> message)
713 {
714     ASSERT(!message->messageReceiverName().isEmpty());
715     ASSERT(!message->messageName().isEmpty());
716
717     if (message->messageReceiverName() == "IPC" && message->messageName() == "SyncMessageReply") {
718         processIncomingSyncReply(WTFMove(message));
719         return;
720     }
721
722     if (!WorkQueueMessageReceiverMap::isValidKey(message->messageReceiverName()) || !ThreadMessageReceiverMap::isValidKey(message->messageReceiverName())) {
723         RefPtr<Connection> protectedThis(this);
724         StringReference messageReceiverNameReference = message->messageReceiverName();
725         String messageReceiverName(messageReceiverNameReference.isEmpty() ? "<unknown message receiver>" : String(messageReceiverNameReference.data(), messageReceiverNameReference.size()));
726         StringReference messageNameReference = message->messageName();
727         String messageName(messageNameReference.isEmpty() ? "<unknown message>" : String(messageNameReference.data(), messageNameReference.size()));
728
729         RunLoop::main().dispatch([protectedThis = makeRef(*this), messageReceiverName = WTFMove(messageReceiverName), messageName = WTFMove(messageName)]() mutable {
730             protectedThis->dispatchDidReceiveInvalidMessage(messageReceiverName.utf8(), messageName.utf8());
731         });
732         return;
733     }
734
735     if (dispatchMessageToWorkQueueReceiver(message))
736         return;
737
738     if (dispatchMessageToThreadReceiver(message))
739         return;
740
741 #if HAVE(QOS_CLASSES)
742     if (message->isSyncMessage() && m_shouldBoostMainThreadOnSyncMessage) {
743         pthread_override_t override = pthread_override_qos_class_start_np(m_mainThread, Thread::adjustedQOSClass(QOS_CLASS_USER_INTERACTIVE), 0);
744         message->setQOSClassOverride(override);
745     }
746 #endif
747
748     if (message->isSyncMessage()) {
749         auto locker = holdLock(m_incomingSyncMessageCallbackMutex);
750
751         for (auto& callback : m_incomingSyncMessageCallbacks.values())
752             m_incomingSyncMessageCallbackQueue->dispatch(WTFMove(callback));
753
754         m_incomingSyncMessageCallbacks.clear();
755     }
756
757     // Check if we're waiting for this message, or if we need to interrupt waiting due to an incoming sync message.
758     {
759         auto locker = holdLock(m_waitForMessageMutex);
760
761         if (m_waitingForMessage && !m_waitingForMessage->decoder) {
762             if (m_waitingForMessage->messageReceiverName == message->messageReceiverName() && m_waitingForMessage->messageName == message->messageName() && m_waitingForMessage->destinationID == message->destinationID()) {
763                 m_waitingForMessage->decoder = WTFMove(message);
764                 ASSERT(m_waitingForMessage->decoder);
765                 m_waitForMessageCondition.notifyOne();
766                 return;
767             }
768
769             if (m_waitingForMessage->waitForOptions.contains(WaitForOption::InterruptWaitingIfSyncMessageArrives) && message->isSyncMessage()) {
770                 m_waitingForMessage->messageWaitingInterrupted = true;
771                 m_waitForMessageCondition.notifyOne();
772                 enqueueIncomingMessage(WTFMove(message));
773                 return;
774             }
775         }
776     }
777
778     // Check if this is a sync message or if it's a message that should be dispatched even when waiting for
779     // a sync reply. If it is, and we're waiting for a sync reply this message needs to be dispatched.
780     // If we don't we'll end up with a deadlock where both sync message senders are stuck waiting for a reply.
781     if (SyncMessageState::singleton().processIncomingMessage(*this, message))
782         return;
783
784     enqueueIncomingMessage(WTFMove(message));
785 }
786
787 uint64_t Connection::installIncomingSyncMessageCallback(WTF::Function<void ()>&& callback)
788 {
789     auto locker = holdLock(m_incomingSyncMessageCallbackMutex);
790
791     m_nextIncomingSyncMessageCallbackID++;
792
793     if (!m_incomingSyncMessageCallbackQueue)
794         m_incomingSyncMessageCallbackQueue = WorkQueue::create("com.apple.WebKit.IPC.IncomingSyncMessageCallbackQueue");
795
796     m_incomingSyncMessageCallbacks.add(m_nextIncomingSyncMessageCallbackID, WTFMove(callback));
797
798     return m_nextIncomingSyncMessageCallbackID;
799 }
800
801 void Connection::uninstallIncomingSyncMessageCallback(uint64_t callbackID)
802 {
803     auto locker = holdLock(m_incomingSyncMessageCallbackMutex);
804     m_incomingSyncMessageCallbacks.remove(callbackID);
805 }
806
807 bool Connection::hasIncomingSyncMessage()
808 {
809     auto locker = holdLock(m_incomingMessagesMutex);
810
811     for (auto& message : m_incomingMessages) {
812         if (message->isSyncMessage())
813             return true;
814     }
815     
816     return false;
817 }
818
819 void Connection::enableIncomingMessagesThrottling()
820 {
821     if (m_incomingMessagesThrottler)
822         return;
823
824     m_incomingMessagesThrottler = makeUnique<MessagesThrottler>(*this, &Connection::dispatchIncomingMessages);
825 }
826
827 void Connection::postConnectionDidCloseOnConnectionWorkQueue()
828 {
829     m_connectionQueue->dispatch([protectedThis = makeRef(*this)]() mutable {
830         protectedThis->connectionDidClose();
831     });
832 }
833
834 void Connection::connectionDidClose()
835 {
836     // The connection is now invalid.
837     platformInvalidate();
838
839     {
840         LockHolder locker(m_syncReplyStateMutex);
841
842         ASSERT(m_shouldWaitForSyncReplies);
843         m_shouldWaitForSyncReplies = false;
844
845         if (!m_pendingSyncReplies.isEmpty())
846             SyncMessageState::singleton().wakeUpClientRunLoop();
847     }
848
849     {
850         auto locker = holdLock(m_waitForMessageMutex);
851
852         ASSERT(m_shouldWaitForMessages);
853         m_shouldWaitForMessages = false;
854
855         if (m_waitingForMessage)
856             m_waitingForMessage->messageWaitingInterrupted = true;
857     }
858     m_waitForMessageCondition.notifyAll();
859
860     if (m_didCloseOnConnectionWorkQueueCallback)
861         m_didCloseOnConnectionWorkQueueCallback(this);
862
863     RunLoop::main().dispatch([protectedThis = makeRef(*this)]() mutable {
864         // If the connection has been explicitly invalidated before dispatchConnectionDidClose was called,
865         // then the connection will be invalid here.
866         if (!protectedThis->isValid())
867             return;
868
869         // Set m_isValid to false before calling didClose, otherwise, sendSync will try to send a message
870         // to the connection and will then wait indefinitely for a reply.
871         protectedThis->m_isValid = false;
872
873         protectedThis->m_client.didClose(protectedThis.get());
874
875         clearAsyncReplyHandlers(protectedThis.get());
876     });
877 }
878
879 bool Connection::canSendOutgoingMessages() const
880 {
881     return m_isConnected && platformCanSendOutgoingMessages();
882 }
883
884 void Connection::sendOutgoingMessages()
885 {
886     if (!canSendOutgoingMessages())
887         return;
888
889     while (true) {
890         std::unique_ptr<Encoder> message;
891
892         {
893             auto locker = holdLock(m_outgoingMessagesMutex);
894             if (m_outgoingMessages.isEmpty())
895                 break;
896             message = m_outgoingMessages.takeFirst();
897         }
898
899         if (!sendOutgoingMessage(WTFMove(message)))
900             break;
901     }
902 }
903
904 void Connection::dispatchSyncMessage(Decoder& decoder)
905 {
906     ASSERT(decoder.isSyncMessage());
907
908     uint64_t syncRequestID = 0;
909     if (!decoder.decode(syncRequestID) || !syncRequestID) {
910         // We received an invalid sync message.
911         decoder.markInvalid();
912         return;
913     }
914
915     auto replyEncoder = makeUnique<Encoder>("IPC", "SyncMessageReply", syncRequestID);
916
917     if (decoder.messageReceiverName() == "IPC" && decoder.messageName() == "WrappedAsyncMessageForTesting") {
918         if (!m_fullySynchronousModeIsAllowedForTesting) {
919             decoder.markInvalid();
920             return;
921         }
922         std::unique_ptr<Decoder> unwrappedDecoder = Decoder::unwrapForTesting(decoder);
923         RELEASE_ASSERT(unwrappedDecoder);
924         processIncomingMessage(WTFMove(unwrappedDecoder));
925
926         SyncMessageState::singleton().dispatchMessages();
927     } else {
928         // Hand off both the decoder and encoder to the client.
929         m_client.didReceiveSyncMessage(*this, decoder, replyEncoder);
930     }
931
932     // FIXME: If the message was invalid, we should send back a SyncMessageError.
933     ASSERT(!decoder.isInvalid());
934
935     if (replyEncoder)
936         sendSyncReply(WTFMove(replyEncoder));
937 }
938
939 void Connection::dispatchDidReceiveInvalidMessage(const CString& messageReceiverNameString, const CString& messageNameString)
940 {
941     ASSERT(RunLoop::isMain());
942
943     if (!isValid())
944         return;
945
946     m_client.didReceiveInvalidMessage(*this, StringReference(messageReceiverNameString.data(), messageReceiverNameString.length()), StringReference(messageNameString.data(), messageNameString.length()));
947 }
948
949 void Connection::didFailToSendSyncMessage()
950 {
951     if (!m_shouldExitOnSyncMessageSendFailure)
952         return;
953
954     exit(0);
955 }
956
957 void Connection::enqueueIncomingMessage(std::unique_ptr<Decoder> incomingMessage)
958 {
959     {
960         auto locker = holdLock(m_incomingMessagesMutex);
961
962 #if PLATFORM(COCOA)
963         if (m_wasKilled)
964             return;
965
966         if (m_incomingMessages.size() >= maxPendingIncomingMessagesKillingThreshold) {
967             if (kill()) {
968                 RELEASE_LOG_ERROR(IPC, "%p - Connection::enqueueIncomingMessage: Over %zu incoming messages have been queued without the main thread processing them, killing the connection as the remote process seems to be misbehaving", this, maxPendingIncomingMessagesKillingThreshold);
969                 m_incomingMessages.clear();
970             }
971             return;
972         }
973 #endif
974
975         m_incomingMessages.append(WTFMove(incomingMessage));
976
977         if (m_incomingMessagesThrottler && m_incomingMessages.size() != 1)
978             return;
979     }
980
981     RunLoop::main().dispatch([protectedThis = makeRef(*this)]() mutable {
982         if (protectedThis->m_incomingMessagesThrottler)
983             protectedThis->dispatchIncomingMessages();
984         else
985             protectedThis->dispatchOneIncomingMessage();
986     });
987 }
988
989 void Connection::dispatchMessage(Decoder& decoder)
990 {
991     RELEASE_ASSERT(isValid());
992     if (decoder.messageReceiverName() == "AsyncReply") {
993         Optional<uint64_t> listenerID;
994         decoder >> listenerID;
995         if (!listenerID) {
996             ASSERT_NOT_REACHED();
997             return;
998         }
999         auto handler = takeAsyncReplyHandler(*this, *listenerID);
1000         if (!handler) {
1001             ASSERT_NOT_REACHED();
1002             return;
1003         }
1004         handler(&decoder);
1005         return;
1006     }
1007
1008     m_client.didReceiveMessage(*this, decoder);
1009 }
1010
1011 bool Connection::dispatchMessageToWorkQueueReceiver(std::unique_ptr<Decoder>& message)
1012 {
1013     auto locker = holdLock(m_workQueueMessageReceiversMutex);
1014     auto it = m_workQueueMessageReceivers.find(message->messageReceiverName());
1015     if (it != m_workQueueMessageReceivers.end()) {
1016         it->value.first->dispatch([protectedThis = makeRef(*this), workQueueMessageReceiver = it->value.second, decoder = WTFMove(message)]() mutable {
1017             protectedThis->dispatchWorkQueueMessageReceiverMessage(*workQueueMessageReceiver, *decoder);
1018         });
1019         return true;
1020     }
1021     return false;
1022 }
1023
1024 bool Connection::dispatchMessageToThreadReceiver(std::unique_ptr<Decoder>& message)
1025 {
1026     RefPtr<ThreadMessageReceiver> protectedThreadMessageReceiver;
1027     {
1028         auto locker = holdLock(m_threadMessageReceiversLock);
1029         protectedThreadMessageReceiver = m_threadMessageReceivers.get(message->messageReceiverName());
1030     }
1031
1032     if (protectedThreadMessageReceiver) {
1033         protectedThreadMessageReceiver->dispatchToThread([protectedThis = makeRef(*this), threadMessageReceiver = WTFMove(protectedThreadMessageReceiver), decoder = WTFMove(message)]() mutable {
1034             protectedThis->dispatchThreadMessageReceiverMessage(*threadMessageReceiver, *decoder);
1035         });
1036         return true;
1037     }
1038     return false;
1039 }
1040
1041 void Connection::dispatchMessage(std::unique_ptr<Decoder> message)
1042 {
1043     ASSERT(RunLoop::isMain());
1044     if (!isValid())
1045         return;
1046
1047     // Messages to WorkQueueMessageReceivers are normally dispatched from the IPC WorkQueue. However, there is a race if
1048     // a client adds itself as a WorkQueueMessageReceiver as a result of receiving an IPC message on the main thread.
1049     // The message might have already been dispatched from the IPC WorkQueue to the main thread by the time the
1050     // client registers itself as a WorkQueueMessageReceiver. To address this, we check again for messages receivers
1051     // once the message arrives on the main thread.
1052     if (dispatchMessageToWorkQueueReceiver(message))
1053         return;
1054
1055     if (message->shouldUseFullySynchronousModeForTesting()) {
1056         if (!m_fullySynchronousModeIsAllowedForTesting) {
1057             m_client.didReceiveInvalidMessage(*this, message->messageReceiverName(), message->messageName());
1058             return;
1059         }
1060         m_inDispatchMessageMarkedToUseFullySynchronousModeForTesting++;
1061     }
1062
1063     m_inDispatchMessageCount++;
1064     
1065     bool isDispatchingMessageWhileWaitingForSyncReply = (message->shouldDispatchMessageWhenWaitingForSyncReply() == ShouldDispatchWhenWaitingForSyncReply::Yes)
1066         || (message->shouldDispatchMessageWhenWaitingForSyncReply() == ShouldDispatchWhenWaitingForSyncReply::YesDuringUnboundedIPC && UnboundedSynchronousIPCScope::hasOngoingUnboundedSyncIPC());
1067
1068     if (isDispatchingMessageWhileWaitingForSyncReply)
1069         m_inDispatchMessageMarkedDispatchWhenWaitingForSyncReplyCount++;
1070
1071     bool oldDidReceiveInvalidMessage = m_didReceiveInvalidMessage;
1072     m_didReceiveInvalidMessage = false;
1073
1074     if (message->isSyncMessage())
1075         dispatchSyncMessage(*message);
1076     else
1077         dispatchMessage(*message);
1078
1079     m_didReceiveInvalidMessage |= message->isInvalid();
1080     m_inDispatchMessageCount--;
1081
1082     // FIXME: For synchronous messages, we should not decrement the counter until we send a response.
1083     // Otherwise, we would deadlock if processing the message results in a sync message back after we exit this function.
1084     if (isDispatchingMessageWhileWaitingForSyncReply)
1085         m_inDispatchMessageMarkedDispatchWhenWaitingForSyncReplyCount--;
1086
1087     if (message->shouldUseFullySynchronousModeForTesting())
1088         m_inDispatchMessageMarkedToUseFullySynchronousModeForTesting--;
1089
1090     if (m_didReceiveInvalidMessage && isValid())
1091         m_client.didReceiveInvalidMessage(*this, message->messageReceiverName(), message->messageName());
1092
1093     m_didReceiveInvalidMessage = oldDidReceiveInvalidMessage;
1094 }
1095
1096 Connection::MessagesThrottler::MessagesThrottler(Connection& connection, DispatchMessagesFunction dispatchMessages)
1097     : m_dispatchMessagesTimer(RunLoop::main(), &connection, dispatchMessages)
1098     , m_connection(connection)
1099     , m_dispatchMessages(dispatchMessages)
1100 {
1101     ASSERT(RunLoop::isMain());
1102 }
1103
1104 void Connection::MessagesThrottler::scheduleMessagesDispatch()
1105 {
1106     ASSERT(RunLoop::isMain());
1107
1108     if (m_throttlingLevel) {
1109         m_dispatchMessagesTimer.startOneShot(0_s);
1110         return;
1111     }
1112     RunLoop::main().dispatch([this, protectedConnection = makeRefPtr(&m_connection)]() mutable {
1113         (protectedConnection.get()->*m_dispatchMessages)();
1114     });
1115 }
1116
1117 size_t Connection::MessagesThrottler::numberOfMessagesToProcess(size_t totalMessages)
1118 {
1119     ASSERT(RunLoop::isMain());
1120
1121     // Never dispatch more than 600 messages without returning to the run loop, we can go as low as 60 with maximum throttling level.
1122     static const size_t maxIncomingMessagesDispatchingBatchSize { 600 };
1123     static const unsigned maxThrottlingLevel = 9;
1124
1125     size_t batchSize = maxIncomingMessagesDispatchingBatchSize / (m_throttlingLevel + 1);
1126
1127     if (totalMessages > maxIncomingMessagesDispatchingBatchSize)
1128         m_throttlingLevel = std::min(m_throttlingLevel + 1, maxThrottlingLevel);
1129     else if (m_throttlingLevel)
1130         --m_throttlingLevel;
1131
1132     return std::min(totalMessages, batchSize);
1133 }
1134
1135 void Connection::dispatchOneIncomingMessage()
1136 {
1137     std::unique_ptr<Decoder> message;
1138     {
1139         auto locker = holdLock(m_incomingMessagesMutex);
1140         if (m_incomingMessages.isEmpty())
1141             return;
1142
1143         message = m_incomingMessages.takeFirst();
1144     }
1145
1146     dispatchMessage(WTFMove(message));
1147 }
1148
1149 void Connection::dispatchIncomingMessages()
1150 {
1151     ASSERT(RunLoop::isMain());
1152
1153     std::unique_ptr<Decoder> message;
1154
1155     size_t messagesToProcess = 0;
1156     {
1157         auto locker = holdLock(m_incomingMessagesMutex);
1158         if (m_incomingMessages.isEmpty())
1159             return;
1160
1161         message = m_incomingMessages.takeFirst();
1162
1163         // Incoming messages may get adding to the queue by the IPC thread while we're dispatching the messages below.
1164         // To make sure dispatchIncomingMessages() yields, we only ever process messages that were in the queue when
1165         // dispatchIncomingMessages() was called. Additionally, the MessageThrottler may further cap the number of
1166         // messages to process to make sure we give the main run loop a chance to process other events.
1167         messagesToProcess = m_incomingMessagesThrottler->numberOfMessagesToProcess(m_incomingMessages.size());
1168         if (messagesToProcess < m_incomingMessages.size()) {
1169             RELEASE_LOG_ERROR(IPC, "%p - Connection::dispatchIncomingMessages: IPC throttling was triggered (has %zu pending incoming messages, will only process %zu before yielding)", this, m_incomingMessages.size(), messagesToProcess);
1170 #if PLATFORM(COCOA)
1171             RELEASE_LOG_ERROR(IPC, "%p - Connection::dispatchIncomingMessages: first IPC message in queue is %{public}s::%{public}s", this, message->messageReceiverName().toString().data(), message->messageName().toString().data());
1172 #endif
1173         }
1174
1175         // Re-schedule ourselves *before* we dispatch the messages because we want to process follow-up messages if the client
1176         // spins a nested run loop while we're dispatching a message. Note that this means we can re-enter this method.
1177         if (!m_incomingMessages.isEmpty())
1178             m_incomingMessagesThrottler->scheduleMessagesDispatch();
1179     }
1180
1181     dispatchMessage(WTFMove(message));
1182
1183     for (size_t i = 1; i < messagesToProcess; ++i) {
1184         {
1185             auto locker = holdLock(m_incomingMessagesMutex);
1186             if (m_incomingMessages.isEmpty())
1187                 return;
1188
1189             message = m_incomingMessages.takeFirst();
1190         }
1191         dispatchMessage(WTFMove(message));
1192     }
1193 }
1194
1195 uint64_t nextAsyncReplyHandlerID()
1196 {
1197     static std::atomic<uint64_t> identifier { 0 };
1198     return ++identifier;
1199 }
1200
1201 void addAsyncReplyHandler(Connection& connection, uint64_t identifier, CompletionHandler<void(Decoder*)>&& completionHandler)
1202 {
1203     LockHolder locker(asyncReplyHandlerMapLock());
1204     auto result = asyncReplyHandlerMap(locker).ensure(reinterpret_cast<uintptr_t>(&connection), [] {
1205         return HashMap<uint64_t, CompletionHandler<void(Decoder*)>>();
1206     }).iterator->value.add(identifier, WTFMove(completionHandler));
1207     ASSERT_UNUSED(result, result.isNewEntry);
1208 }
1209
1210 void clearAsyncReplyHandlers(const Connection& connection)
1211 {
1212     HashMap<uint64_t, CompletionHandler<void(Decoder*)>> map;
1213     {
1214         LockHolder locker(asyncReplyHandlerMapLock());
1215         map = asyncReplyHandlerMap(locker).take(reinterpret_cast<uintptr_t>(&connection));
1216     }
1217
1218     for (auto& handler : map.values()) {
1219         if (handler)
1220             handler(nullptr);
1221     }
1222 }
1223
1224 CompletionHandler<void(Decoder*)> takeAsyncReplyHandler(Connection& connection, uint64_t identifier)
1225 {
1226     LockHolder locker(asyncReplyHandlerMapLock());
1227     auto& map = asyncReplyHandlerMap(locker);
1228     auto iterator = map.find(reinterpret_cast<uintptr_t>(&connection));
1229     if (iterator != map.end()) {
1230         if (!iterator->value.isValidKey(identifier)) {
1231             ASSERT_NOT_REACHED();
1232             connection.markCurrentlyDispatchedMessageAsInvalid();
1233             return nullptr;
1234         }
1235         ASSERT(iterator->value.contains(identifier));
1236         return iterator->value.take(identifier);
1237     }
1238     ASSERT_NOT_REACHED();
1239     return nullptr;
1240 }
1241
1242 void Connection::wakeUpRunLoop()
1243 {
1244     RunLoop::main().wakeUp();
1245 }
1246
1247 } // namespace IPC