Don't use DispatchMessageEvenWhenWaitingForSyncReply for messages from NetworkProcess
[WebKit-https.git] / Source / WebKit2 / Platform / IPC / Connection.cpp
1 /*
2  * Copyright (C) 2010 Apple Inc. All rights reserved.
3  *
4  * Redistribution and use in source and binary forms, with or without
5  * modification, are permitted provided that the following conditions
6  * are met:
7  * 1. Redistributions of source code must retain the above copyright
8  *    notice, this list of conditions and the following disclaimer.
9  * 2. Redistributions in binary form must reproduce the above copyright
10  *    notice, this list of conditions and the following disclaimer in the
11  *    documentation and/or other materials provided with the distribution.
12  *
13  * THIS SOFTWARE IS PROVIDED BY APPLE INC. AND ITS CONTRIBUTORS ``AS IS''
14  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
15  * THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
16  * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL APPLE INC. OR ITS CONTRIBUTORS
17  * BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
18  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
19  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
20  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
21  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
22  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF
23  * THE POSSIBILITY OF SUCH DAMAGE.
24  */
25
26 #include "config.h"
27 #include "Connection.h"
28
29 #include <memory>
30 #include <wtf/CurrentTime.h>
31 #include <wtf/HashSet.h>
32 #include <wtf/NeverDestroyed.h>
33 #include <wtf/RunLoop.h>
34 #include <wtf/text/WTFString.h>
35 #include <wtf/threads/BinarySemaphore.h>
36
37 namespace IPC {
38
39 class Connection::SyncMessageState : public ThreadSafeRefCounted<Connection::SyncMessageState> {
40 public:
41     static PassRefPtr<SyncMessageState> getOrCreate(RunLoop&);
42     ~SyncMessageState();
43
44     void wakeUpClientRunLoop()
45     {
46         m_waitForSyncReplySemaphore.signal();
47     }
48
49     bool wait(double absoluteTime)
50     {
51         return m_waitForSyncReplySemaphore.wait(absoluteTime);
52     }
53
54     // Returns true if this message will be handled on a client thread that is currently
55     // waiting for a reply to a synchronous message.
56     bool processIncomingMessage(Connection*, std::unique_ptr<MessageDecoder>&);
57
58     // Dispatch pending sync messages. if allowedConnection is not null, will only dispatch messages
59     // from that connection and put the other messages back in the queue.
60     void dispatchMessages(Connection* allowedConnection);
61
62 private:
63     explicit SyncMessageState(RunLoop&);
64
65     typedef HashMap<RunLoop*, SyncMessageState*> SyncMessageStateMap;
66     static SyncMessageStateMap& syncMessageStateMap()
67     {
68         static NeverDestroyed<SyncMessageStateMap> syncMessageStateMap;
69         return syncMessageStateMap;
70     }
71
72     static Mutex& syncMessageStateMapMutex()
73     {
74         static NeverDestroyed<Mutex> syncMessageStateMapMutex;
75         return syncMessageStateMapMutex;
76     }
77
78     void dispatchMessageAndResetDidScheduleDispatchMessagesForConnection(Connection*);
79
80     RunLoop& m_runLoop;
81     BinarySemaphore m_waitForSyncReplySemaphore;
82
83     // Protects m_didScheduleDispatchMessagesWorkSet and m_messagesToDispatchWhileWaitingForSyncReply.
84     Mutex m_mutex;
85
86     // The set of connections for which we've scheduled a call to dispatchMessageAndResetDidScheduleDispatchMessagesForConnection.
87     HashSet<RefPtr<Connection>> m_didScheduleDispatchMessagesWorkSet;
88
89     struct ConnectionAndIncomingMessage {
90         RefPtr<Connection> connection;
91         std::unique_ptr<MessageDecoder> message;
92     };
93     Vector<ConnectionAndIncomingMessage> m_messagesToDispatchWhileWaitingForSyncReply;
94 };
95
96 class Connection::SecondaryThreadPendingSyncReply {
97 public:
98     // The reply decoder, will be null if there was an error processing the sync message on the other side.
99     std::unique_ptr<MessageDecoder> replyDecoder;
100
101     BinarySemaphore semaphore;
102 };
103
104
105 PassRefPtr<Connection::SyncMessageState> Connection::SyncMessageState::getOrCreate(RunLoop& runLoop)
106 {
107     MutexLocker locker(syncMessageStateMapMutex());
108     SyncMessageStateMap::AddResult result = syncMessageStateMap().add(&runLoop, nullptr);
109
110     if (!result.isNewEntry) {
111         ASSERT(result.iterator->value);
112         return result.iterator->value;
113     }
114
115     RefPtr<SyncMessageState> syncMessageState = adoptRef(new SyncMessageState(runLoop));
116     result.iterator->value = syncMessageState.get();
117
118     return syncMessageState.release();
119 }
120
121 Connection::SyncMessageState::SyncMessageState(RunLoop& runLoop)
122     : m_runLoop(runLoop)
123 {
124 }
125
126 Connection::SyncMessageState::~SyncMessageState()
127 {
128     MutexLocker locker(syncMessageStateMapMutex());
129     
130     ASSERT(syncMessageStateMap().contains(&m_runLoop));
131     syncMessageStateMap().remove(&m_runLoop);
132
133     ASSERT(m_messagesToDispatchWhileWaitingForSyncReply.isEmpty());
134 }
135
136 bool Connection::SyncMessageState::processIncomingMessage(Connection* connection, std::unique_ptr<MessageDecoder>& message)
137 {
138     if (!message->shouldDispatchMessageWhenWaitingForSyncReply())
139         return false;
140
141     ConnectionAndIncomingMessage connectionAndIncomingMessage;
142     connectionAndIncomingMessage.connection = connection;
143     connectionAndIncomingMessage.message = std::move(message);
144
145     {
146         MutexLocker locker(m_mutex);
147         
148         if (m_didScheduleDispatchMessagesWorkSet.add(connection).isNewEntry)
149             m_runLoop.dispatch(bind(&SyncMessageState::dispatchMessageAndResetDidScheduleDispatchMessagesForConnection, this, RefPtr<Connection>(connection)));
150
151         m_messagesToDispatchWhileWaitingForSyncReply.append(std::move(connectionAndIncomingMessage));
152     }
153
154     wakeUpClientRunLoop();
155
156     return true;
157 }
158
159 void Connection::SyncMessageState::dispatchMessages(Connection* allowedConnection)
160 {
161     ASSERT(&m_runLoop == &RunLoop::current());
162
163     Vector<ConnectionAndIncomingMessage> messagesToDispatchWhileWaitingForSyncReply;
164
165     {
166         MutexLocker locker(m_mutex);
167         m_messagesToDispatchWhileWaitingForSyncReply.swap(messagesToDispatchWhileWaitingForSyncReply);
168     }
169
170     Vector<ConnectionAndIncomingMessage> messagesToPutBack;
171
172     for (size_t i = 0; i < messagesToDispatchWhileWaitingForSyncReply.size(); ++i) {
173         ConnectionAndIncomingMessage& connectionAndIncomingMessage = messagesToDispatchWhileWaitingForSyncReply[i];
174
175         if (allowedConnection && allowedConnection != connectionAndIncomingMessage.connection) {
176             // This incoming message belongs to another connection and we don't want to dispatch it now
177             // so mark it to be put back in the message queue.
178             messagesToPutBack.append(std::move(connectionAndIncomingMessage));
179             continue;
180         }
181
182         connectionAndIncomingMessage.connection->dispatchMessage(std::move(connectionAndIncomingMessage.message));
183     }
184
185     if (!messagesToPutBack.isEmpty()) {
186         MutexLocker locker(m_mutex);
187
188         for (auto& message : messagesToPutBack)
189             m_messagesToDispatchWhileWaitingForSyncReply.append(std::move(message));
190     }
191 }
192
193 void Connection::SyncMessageState::dispatchMessageAndResetDidScheduleDispatchMessagesForConnection(Connection* connection)
194 {
195     {
196         MutexLocker locker(m_mutex);
197         ASSERT(m_didScheduleDispatchMessagesWorkSet.contains(connection));
198         m_didScheduleDispatchMessagesWorkSet.remove(connection);
199     }
200
201     dispatchMessages(connection);
202 }
203
204 PassRefPtr<Connection> Connection::createServerConnection(Identifier identifier, Client* client, RunLoop& clientRunLoop)
205 {
206     return adoptRef(new Connection(identifier, true, client, clientRunLoop));
207 }
208
209 PassRefPtr<Connection> Connection::createClientConnection(Identifier identifier, Client* client, RunLoop& clientRunLoop)
210 {
211     return adoptRef(new Connection(identifier, false, client, clientRunLoop));
212 }
213
214 Connection::Connection(Identifier identifier, bool isServer, Client* client, RunLoop& clientRunLoop)
215     : m_client(client)
216     , m_isServer(isServer)
217     , m_syncRequestID(0)
218     , m_onlySendMessagesAsDispatchWhenWaitingForSyncReplyWhenProcessingSuchAMessage(false)
219     , m_shouldExitOnSyncMessageSendFailure(false)
220     , m_didCloseOnConnectionWorkQueueCallback(0)
221     , m_isConnected(false)
222     , m_connectionQueue(WorkQueue::create("com.apple.IPC.ReceiveQueue"))
223     , m_clientRunLoop(clientRunLoop)
224     , m_inSendSyncCount(0)
225     , m_inDispatchMessageCount(0)
226     , m_inDispatchMessageMarkedDispatchWhenWaitingForSyncReplyCount(0)
227     , m_didReceiveInvalidMessage(false)
228     , m_syncMessageState(SyncMessageState::getOrCreate(clientRunLoop))
229     , m_shouldWaitForSyncReplies(true)
230 {
231     ASSERT(m_client);
232
233     platformInitialize(identifier);
234 }
235
236 Connection::~Connection()
237 {
238     ASSERT(!isValid());
239 }
240
241 void Connection::setOnlySendMessagesAsDispatchWhenWaitingForSyncReplyWhenProcessingSuchAMessage(bool flag)
242 {
243     ASSERT(!m_isConnected);
244
245     m_onlySendMessagesAsDispatchWhenWaitingForSyncReplyWhenProcessingSuchAMessage = flag;
246 }
247
248 void Connection::setShouldExitOnSyncMessageSendFailure(bool shouldExitOnSyncMessageSendFailure)
249 {
250     ASSERT(!m_isConnected);
251
252     m_shouldExitOnSyncMessageSendFailure = shouldExitOnSyncMessageSendFailure;
253 }
254
255 void Connection::addWorkQueueMessageReceiver(StringReference messageReceiverName, WorkQueue* workQueue, WorkQueueMessageReceiver* workQueueMessageReceiver)
256 {
257     ASSERT(&RunLoop::current() == &m_clientRunLoop);
258     ASSERT(!m_isConnected);
259
260     RefPtr<Connection> connection(this);
261     m_connectionQueue->dispatch([connection, messageReceiverName, workQueue, workQueueMessageReceiver] {
262         ASSERT(!connection->m_workQueueMessageReceivers.contains(messageReceiverName));
263
264         connection->m_workQueueMessageReceivers.add(messageReceiverName, std::make_pair(workQueue, workQueueMessageReceiver));
265     });
266 }
267
268 void Connection::removeWorkQueueMessageReceiver(StringReference messageReceiverName)
269 {
270     ASSERT(&RunLoop::current() == &m_clientRunLoop);
271
272     RefPtr<Connection> connection(this);
273     m_connectionQueue->dispatch([connection, messageReceiverName] {
274         ASSERT(connection->m_workQueueMessageReceivers.contains(messageReceiverName));
275         connection->m_workQueueMessageReceivers.remove(messageReceiverName);
276     });
277 }
278
279 void Connection::dispatchWorkQueueMessageReceiverMessage(WorkQueueMessageReceiver* workQueueMessageReceiver, MessageDecoder* incomingMessageDecoder)
280 {
281     std::unique_ptr<MessageDecoder> decoder(incomingMessageDecoder);
282
283     if (!decoder->isSyncMessage()) {
284         workQueueMessageReceiver->didReceiveMessage(this, *decoder);
285         return;
286     }
287
288     uint64_t syncRequestID = 0;
289     if (!decoder->decode(syncRequestID) || !syncRequestID) {
290         // We received an invalid sync message.
291         // FIXME: Handle this.
292         decoder->markInvalid();
293         return;
294     }
295
296     auto replyEncoder = std::make_unique<MessageEncoder>("IPC", "SyncMessageReply", syncRequestID);
297
298     // Hand off both the decoder and encoder to the work queue message receiver.
299     workQueueMessageReceiver->didReceiveSyncMessage(this, *decoder, replyEncoder);
300
301     // FIXME: If the message was invalid, we should send back a SyncMessageError.
302     ASSERT(!decoder->isInvalid());
303
304     if (replyEncoder)
305         sendSyncReply(std::move(replyEncoder));
306 }
307
308 void Connection::setDidCloseOnConnectionWorkQueueCallback(DidCloseOnConnectionWorkQueueCallback callback)
309 {
310     ASSERT(!m_isConnected);
311
312     m_didCloseOnConnectionWorkQueueCallback = callback;    
313 }
314
315 void Connection::invalidate()
316 {
317     if (!isValid()) {
318         // Someone already called invalidate().
319         return;
320     }
321     
322     // Reset the client.
323     m_client = 0;
324
325     m_connectionQueue->dispatch(WTF::bind(&Connection::platformInvalidate, this));
326 }
327
328 void Connection::markCurrentlyDispatchedMessageAsInvalid()
329 {
330     // This should only be called while processing a message.
331     ASSERT(m_inDispatchMessageCount > 0);
332
333     m_didReceiveInvalidMessage = true;
334 }
335
336 std::unique_ptr<MessageEncoder> Connection::createSyncMessageEncoder(StringReference messageReceiverName, StringReference messageName, uint64_t destinationID, uint64_t& syncRequestID)
337 {
338     auto encoder = std::make_unique<MessageEncoder>(messageReceiverName, messageName, destinationID);
339     encoder->setIsSyncMessage(true);
340
341     // Encode the sync request ID.
342     syncRequestID = ++m_syncRequestID;
343     *encoder << syncRequestID;
344
345     return encoder;
346 }
347
348 bool Connection::sendMessage(std::unique_ptr<MessageEncoder> encoder, unsigned messageSendFlags)
349 {
350     if (!isValid())
351         return false;
352
353     if (messageSendFlags & DispatchMessageEvenWhenWaitingForSyncReply
354         && (!m_onlySendMessagesAsDispatchWhenWaitingForSyncReplyWhenProcessingSuchAMessage
355             || m_inDispatchMessageMarkedDispatchWhenWaitingForSyncReplyCount))
356         encoder->setShouldDispatchMessageWhenWaitingForSyncReply(true);
357
358     {
359         MutexLocker locker(m_outgoingMessagesLock);
360         m_outgoingMessages.append(std::move(encoder));
361     }
362     
363     // FIXME: We should add a boolean flag so we don't call this when work has already been scheduled.
364     m_connectionQueue->dispatch(WTF::bind(&Connection::sendOutgoingMessages, this));
365     return true;
366 }
367
368 bool Connection::sendSyncReply(std::unique_ptr<MessageEncoder> encoder)
369 {
370     return sendMessage(std::move(encoder));
371 }
372
373 std::unique_ptr<MessageDecoder> Connection::waitForMessage(StringReference messageReceiverName, StringReference messageName, uint64_t destinationID, std::chrono::milliseconds timeout)
374 {
375     // First, check if this message is already in the incoming messages queue.
376     {
377         MutexLocker locker(m_incomingMessagesLock);
378
379         for (auto it = m_incomingMessages.begin(), end = m_incomingMessages.end(); it != end; ++it) {
380             std::unique_ptr<MessageDecoder>& message = *it;
381
382             if (message->messageReceiverName() == messageReceiverName && message->messageName() == messageName && message->destinationID() == destinationID) {
383                 std::unique_ptr<MessageDecoder> returnedMessage = std::move(message);
384
385                 m_incomingMessages.remove(it);
386                 return returnedMessage;
387             }
388         }
389     }
390
391     std::pair<std::pair<StringReference, StringReference>, uint64_t> messageAndDestination(std::make_pair(std::make_pair(messageReceiverName, messageName), destinationID));
392     
393     {
394         std::lock_guard<std::mutex> lock(m_waitForMessageMutex);
395
396         // We don't support having multiple clients wait for the same message.
397         ASSERT(!m_waitForMessageMap.contains(messageAndDestination));
398     
399         // Insert our pending wait.
400         m_waitForMessageMap.set(messageAndDestination, nullptr);
401     }
402     
403     // Now wait for it to be set.
404     while (true) {
405         std::unique_lock<std::mutex> lock(m_waitForMessageMutex);
406
407         auto it = m_waitForMessageMap.find(messageAndDestination);
408         if (it->value) {
409             std::unique_ptr<MessageDecoder> decoder = std::move(it->value);
410             m_waitForMessageMap.remove(it);
411
412             return decoder;
413         }
414
415         // Now we wait.
416         if (m_waitForMessageCondition.wait_for(lock, timeout) == std::cv_status::timeout) {
417             // We timed out, now remove the pending wait.
418             m_waitForMessageMap.remove(messageAndDestination);
419
420             break;
421         }
422     }
423
424     return nullptr;
425 }
426
427 std::unique_ptr<MessageDecoder> Connection::sendSyncMessage(uint64_t syncRequestID, std::unique_ptr<MessageEncoder> encoder, std::chrono::milliseconds timeout, unsigned syncSendFlags)
428 {
429     if (&RunLoop::current() != &m_clientRunLoop) {
430         // No flags are supported for synchronous messages sent from secondary threads.
431         ASSERT(!syncSendFlags);
432         return sendSyncMessageFromSecondaryThread(syncRequestID, std::move(encoder), timeout);
433     }
434
435     if (!isValid()) {
436         didFailToSendSyncMessage();
437         return nullptr;
438     }
439
440     // Push the pending sync reply information on our stack.
441     {
442         MutexLocker locker(m_syncReplyStateMutex);
443         if (!m_shouldWaitForSyncReplies) {
444             didFailToSendSyncMessage();
445             return nullptr;
446         }
447
448         m_pendingSyncReplies.append(PendingSyncReply(syncRequestID));
449     }
450
451     ++m_inSendSyncCount;
452
453     // First send the message.
454     sendMessage(std::move(encoder), DispatchMessageEvenWhenWaitingForSyncReply);
455
456     // Then wait for a reply. Waiting for a reply could involve dispatching incoming sync messages, so
457     // keep an extra reference to the connection here in case it's invalidated.
458     Ref<Connection> protect(*this);
459     std::unique_ptr<MessageDecoder> reply = waitForSyncReply(syncRequestID, timeout, syncSendFlags);
460
461     --m_inSendSyncCount;
462
463     // Finally, pop the pending sync reply information.
464     {
465         MutexLocker locker(m_syncReplyStateMutex);
466         ASSERT(m_pendingSyncReplies.last().syncRequestID == syncRequestID);
467         m_pendingSyncReplies.removeLast();
468     }
469
470     if (!reply)
471         didFailToSendSyncMessage();
472
473     return reply;
474 }
475
476 std::unique_ptr<MessageDecoder> Connection::sendSyncMessageFromSecondaryThread(uint64_t syncRequestID, std::unique_ptr<MessageEncoder> encoder, std::chrono::milliseconds timeout)
477 {
478     ASSERT(&RunLoop::current() != &m_clientRunLoop);
479
480     if (!isValid())
481         return nullptr;
482
483     SecondaryThreadPendingSyncReply pendingReply;
484
485     // Push the pending sync reply information on our stack.
486     {
487         MutexLocker locker(m_syncReplyStateMutex);
488         if (!m_shouldWaitForSyncReplies)
489             return nullptr;
490
491         ASSERT(!m_secondaryThreadPendingSyncReplyMap.contains(syncRequestID));
492         m_secondaryThreadPendingSyncReplyMap.add(syncRequestID, &pendingReply);
493     }
494
495     sendMessage(std::move(encoder), 0);
496
497     pendingReply.semaphore.wait(currentTime() + (timeout.count() / 1000.0));
498
499     // Finally, pop the pending sync reply information.
500     {
501         MutexLocker locker(m_syncReplyStateMutex);
502         ASSERT(m_secondaryThreadPendingSyncReplyMap.contains(syncRequestID));
503         m_secondaryThreadPendingSyncReplyMap.remove(syncRequestID);
504     }
505
506     return std::move(pendingReply.replyDecoder);
507 }
508
509 std::unique_ptr<MessageDecoder> Connection::waitForSyncReply(uint64_t syncRequestID, std::chrono::milliseconds timeout, unsigned syncSendFlags)
510 {
511     double absoluteTime = currentTime() + (timeout.count() / 1000.0);
512
513     bool timedOut = false;
514     while (!timedOut) {
515         // First, check if we have any messages that we need to process.
516         m_syncMessageState->dispatchMessages(0);
517         
518         {
519             MutexLocker locker(m_syncReplyStateMutex);
520
521             // Second, check if there is a sync reply at the top of the stack.
522             ASSERT(!m_pendingSyncReplies.isEmpty());
523             
524             PendingSyncReply& pendingSyncReply = m_pendingSyncReplies.last();
525             ASSERT_UNUSED(syncRequestID, pendingSyncReply.syncRequestID == syncRequestID);
526             
527             // We found the sync reply, or the connection was closed.
528             if (pendingSyncReply.didReceiveReply || !m_shouldWaitForSyncReplies)
529                 return std::move(pendingSyncReply.replyDecoder);
530         }
531
532         // Processing a sync message could cause the connection to be invalidated.
533         // (If the handler ends up calling Connection::invalidate).
534         // If that happens, we need to stop waiting, or we'll hang since we won't get
535         // any more incoming messages.
536         if (!isValid())
537             return nullptr;
538
539         // We didn't find a sync reply yet, keep waiting.
540         // This allows the WebProcess to still serve clients while waiting for the message to return.
541         // Notably, it can continue to process accessibility requests, which are on the main thread.
542         if (syncSendFlags & SpinRunLoopWhileWaitingForReply) {
543 #if PLATFORM(COCOA)
544             // FIXME: Although we run forever, any events incoming will cause us to drop out and exit out. This however doesn't
545             // account for a timeout value passed in. Timeout is always NoTimeout in these cases, but that could change.
546             RunLoop::current().runForDuration(1e10);
547             timedOut = currentTime() >= absoluteTime;
548 #endif
549         } else
550             timedOut = !m_syncMessageState->wait(absoluteTime);
551         
552     }
553
554     return nullptr;
555 }
556
557 void Connection::processIncomingSyncReply(std::unique_ptr<MessageDecoder> decoder)
558 {
559     MutexLocker locker(m_syncReplyStateMutex);
560
561     // Go through the stack of sync requests that have pending replies and see which one
562     // this reply is for.
563     for (size_t i = m_pendingSyncReplies.size(); i > 0; --i) {
564         PendingSyncReply& pendingSyncReply = m_pendingSyncReplies[i - 1];
565
566         if (pendingSyncReply.syncRequestID != decoder->destinationID())
567             continue;
568
569         ASSERT(!pendingSyncReply.replyDecoder);
570
571         pendingSyncReply.replyDecoder = std::move(decoder);
572         pendingSyncReply.didReceiveReply = true;
573
574         // We got a reply to the last send message, wake up the client run loop so it can be processed.
575         if (i == m_pendingSyncReplies.size())
576             m_syncMessageState->wakeUpClientRunLoop();
577
578         return;
579     }
580
581     // If it's not a reply to any primary thread message, check if it is a reply to a secondary thread one.
582     SecondaryThreadPendingSyncReplyMap::iterator secondaryThreadReplyMapItem = m_secondaryThreadPendingSyncReplyMap.find(decoder->destinationID());
583     if (secondaryThreadReplyMapItem != m_secondaryThreadPendingSyncReplyMap.end()) {
584         SecondaryThreadPendingSyncReply* reply = secondaryThreadReplyMapItem->value;
585         ASSERT(!reply->replyDecoder);
586         reply->replyDecoder = std::move(decoder);
587         reply->semaphore.signal();
588     }
589
590     // If we get here, it means we got a reply for a message that wasn't in the sync request stack or map.
591     // This can happen if the send timed out, so it's fine to ignore.
592 }
593
594 void Connection::processIncomingMessage(std::unique_ptr<MessageDecoder> message)
595 {
596     ASSERT(!message->messageReceiverName().isEmpty());
597     ASSERT(!message->messageName().isEmpty());
598
599     if (message->messageReceiverName() == "IPC" && message->messageName() == "SyncMessageReply") {
600         processIncomingSyncReply(std::move(message));
601         return;
602     }
603
604     if (!m_workQueueMessageReceivers.isValidKey(message->messageReceiverName())) {
605         if (message->messageReceiverName().isEmpty() && message->messageName().isEmpty()) {
606             // Something went wrong when decoding the message. Encode the message length so we can figure out if this
607             // happens for certain message lengths.
608             CString messageReceiverName = "<unknown message>";
609             CString messageName = String::format("<message length: %zu bytes>", message->length()).utf8();
610
611             m_clientRunLoop.dispatch(bind(&Connection::dispatchDidReceiveInvalidMessage, this, messageReceiverName, messageName));
612             return;
613         }
614
615         m_clientRunLoop.dispatch(bind(&Connection::dispatchDidReceiveInvalidMessage, this, message->messageReceiverName().toString(), message->messageName().toString()));
616         return;
617     }
618
619     auto it = m_workQueueMessageReceivers.find(message->messageReceiverName());
620     if (it != m_workQueueMessageReceivers.end()) {
621         it->value.first->dispatch(bind(&Connection::dispatchWorkQueueMessageReceiverMessage, this, it->value.second, message.release()));
622         return;
623     }
624
625     // Check if this is a sync message or if it's a message that should be dispatched even when waiting for
626     // a sync reply. If it is, and we're waiting for a sync reply this message needs to be dispatched.
627     // If we don't we'll end up with a deadlock where both sync message senders are stuck waiting for a reply.
628     if (m_syncMessageState->processIncomingMessage(this, message))
629         return;
630
631     // Check if we're waiting for this message.
632     {
633         std::lock_guard<std::mutex> lock(m_waitForMessageMutex);
634
635         auto it = m_waitForMessageMap.find(std::make_pair(std::make_pair(message->messageReceiverName(), message->messageName()), message->destinationID()));
636         if (it != m_waitForMessageMap.end()) {
637             it->value = std::move(message);
638             ASSERT(it->value);
639         
640             m_waitForMessageCondition.notify_one();
641             return;
642         }
643     }
644
645     enqueueIncomingMessage(std::move(message));
646 }
647
648 void Connection::postConnectionDidCloseOnConnectionWorkQueue()
649 {
650     m_connectionQueue->dispatch(WTF::bind(&Connection::connectionDidClose, this));
651 }
652
653 void Connection::connectionDidClose()
654 {
655     // The connection is now invalid.
656     platformInvalidate();
657
658     {
659         MutexLocker locker(m_syncReplyStateMutex);
660
661         ASSERT(m_shouldWaitForSyncReplies);
662         m_shouldWaitForSyncReplies = false;
663
664         if (!m_pendingSyncReplies.isEmpty())
665             m_syncMessageState->wakeUpClientRunLoop();
666
667         for (SecondaryThreadPendingSyncReplyMap::iterator iter = m_secondaryThreadPendingSyncReplyMap.begin(); iter != m_secondaryThreadPendingSyncReplyMap.end(); ++iter)
668             iter->value->semaphore.signal();
669     }
670
671     if (m_didCloseOnConnectionWorkQueueCallback)
672         m_didCloseOnConnectionWorkQueueCallback(this);
673
674     m_clientRunLoop.dispatch(WTF::bind(&Connection::dispatchConnectionDidClose, this));
675 }
676
677 void Connection::dispatchConnectionDidClose()
678 {
679     // If the connection has been explicitly invalidated before dispatchConnectionDidClose was called,
680     // then the client will be null here.
681     if (!m_client)
682         return;
683
684     // Because we define a connection as being "valid" based on wheter it has a null client, we null out
685     // the client before calling didClose here. Otherwise, sendSync will try to send a message to the connection and
686     // will then wait indefinitely for a reply.
687     Client* client = m_client;
688     m_client = 0;
689     
690     client->didClose(this);
691 }
692
693 bool Connection::canSendOutgoingMessages() const
694 {
695     return m_isConnected && platformCanSendOutgoingMessages();
696 }
697
698 void Connection::sendOutgoingMessages()
699 {
700     if (!canSendOutgoingMessages())
701         return;
702
703     while (true) {
704         std::unique_ptr<MessageEncoder> message;
705
706         {
707             MutexLocker locker(m_outgoingMessagesLock);
708             if (m_outgoingMessages.isEmpty())
709                 break;
710             message = m_outgoingMessages.takeFirst();
711         }
712
713         if (!sendOutgoingMessage(std::move(message)))
714             break;
715     }
716 }
717
718 void Connection::dispatchSyncMessage(MessageDecoder& decoder)
719 {
720     ASSERT(decoder.isSyncMessage());
721
722     uint64_t syncRequestID = 0;
723     if (!decoder.decode(syncRequestID) || !syncRequestID) {
724         // We received an invalid sync message.
725         decoder.markInvalid();
726         return;
727     }
728
729     auto replyEncoder = std::make_unique<MessageEncoder>("IPC", "SyncMessageReply", syncRequestID);
730
731     // Hand off both the decoder and encoder to the client.
732     m_client->didReceiveSyncMessage(this, decoder, replyEncoder);
733
734     // FIXME: If the message was invalid, we should send back a SyncMessageError.
735     ASSERT(!decoder.isInvalid());
736
737     if (replyEncoder)
738         sendSyncReply(std::move(replyEncoder));
739 }
740
741 void Connection::dispatchDidReceiveInvalidMessage(const CString& messageReceiverNameString, const CString& messageNameString)
742 {
743     ASSERT(&RunLoop::current() == &m_clientRunLoop);
744
745     if (!m_client)
746         return;
747
748     m_client->didReceiveInvalidMessage(this, StringReference(messageReceiverNameString.data(), messageReceiverNameString.length()), StringReference(messageNameString.data(), messageNameString.length()));
749 }
750
751 void Connection::didFailToSendSyncMessage()
752 {
753     if (!m_shouldExitOnSyncMessageSendFailure)
754         return;
755
756     exit(0);
757 }
758
759 void Connection::enqueueIncomingMessage(std::unique_ptr<MessageDecoder> incomingMessage)
760 {
761     {
762         MutexLocker locker(m_incomingMessagesLock);
763         m_incomingMessages.append(std::move(incomingMessage));
764     }
765
766     m_clientRunLoop.dispatch(WTF::bind(&Connection::dispatchOneMessage, this));
767 }
768
769 void Connection::dispatchMessage(MessageDecoder& decoder)
770 {
771     m_client->didReceiveMessage(this, decoder);
772 }
773
774 void Connection::dispatchMessage(std::unique_ptr<MessageDecoder> message)
775 {
776     // If there's no client, return. We do this after calling releaseArguments so that
777     // the ArgumentDecoder message will be freed.
778     if (!m_client)
779         return;
780
781     m_inDispatchMessageCount++;
782
783     if (message->shouldDispatchMessageWhenWaitingForSyncReply())
784         m_inDispatchMessageMarkedDispatchWhenWaitingForSyncReplyCount++;
785
786     bool oldDidReceiveInvalidMessage = m_didReceiveInvalidMessage;
787     m_didReceiveInvalidMessage = false;
788
789     if (message->isSyncMessage())
790         dispatchSyncMessage(*message);
791     else
792         dispatchMessage(*message);
793
794     m_didReceiveInvalidMessage |= message->isInvalid();
795     m_inDispatchMessageCount--;
796
797     // FIXME: For Delayed synchronous messages, we should not decrement the counter until we send a response.
798     // Otherwise, we would deadlock if processing the message results in a sync message back after we exit this function.
799     if (message->shouldDispatchMessageWhenWaitingForSyncReply())
800         m_inDispatchMessageMarkedDispatchWhenWaitingForSyncReplyCount--;
801
802     if (m_didReceiveInvalidMessage && m_client)
803         m_client->didReceiveInvalidMessage(this, message->messageReceiverName(), message->messageName());
804
805     m_didReceiveInvalidMessage = oldDidReceiveInvalidMessage;
806 }
807
808 void Connection::dispatchOneMessage()
809 {
810     std::unique_ptr<MessageDecoder> message;
811
812     {
813         MutexLocker locker(m_incomingMessagesLock);
814         if (m_incomingMessages.isEmpty())
815             return;
816
817         message = m_incomingMessages.takeFirst();
818     }
819
820     dispatchMessage(std::move(message));
821 }
822
823 void Connection::wakeUpRunLoop()
824 {
825     m_clientRunLoop.wakeUp();
826 }
827
828 } // namespace IPC