2bd40235f16f566f08d1eb442c1ce8c00f20030a
[WebKit-https.git] / Source / WebKit2 / Platform / CoreIPC / Connection.cpp
1 /*
2  * Copyright (C) 2010 Apple Inc. All rights reserved.
3  *
4  * Redistribution and use in source and binary forms, with or without
5  * modification, are permitted provided that the following conditions
6  * are met:
7  * 1. Redistributions of source code must retain the above copyright
8  *    notice, this list of conditions and the following disclaimer.
9  * 2. Redistributions in binary form must reproduce the above copyright
10  *    notice, this list of conditions and the following disclaimer in the
11  *    documentation and/or other materials provided with the distribution.
12  *
13  * THIS SOFTWARE IS PROVIDED BY APPLE INC. AND ITS CONTRIBUTORS ``AS IS''
14  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
15  * THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
16  * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL APPLE INC. OR ITS CONTRIBUTORS
17  * BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
18  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
19  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
20  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
21  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
22  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF
23  * THE POSSIBILITY OF SUCH DAMAGE.
24  */
25
26 #include "config.h"
27 #include "Connection.h"
28
29 #include "BinarySemaphore.h"
30 #include "CoreIPCMessageKinds.h"
31 #include "RunLoop.h"
32 #include "WorkItem.h"
33 #include <wtf/CurrentTime.h>
34
35 using namespace std;
36
37 namespace CoreIPC {
38
39 class Connection::SyncMessageState : public ThreadSafeRefCounted<Connection::SyncMessageState> {
40 public:
41     static PassRefPtr<SyncMessageState> getOrCreate(RunLoop*);
42     ~SyncMessageState();
43
44     void wakeUpClientRunLoop()
45     {
46         m_waitForSyncReplySemaphore.signal();
47     }
48
49     bool wait(double absoluteTime)
50     {
51         return m_waitForSyncReplySemaphore.wait(absoluteTime);
52     }
53
54 #if PLATFORM(WIN)
55     bool waitWhileDispatchingSentWin32Messages(double absoluteTime, const Vector<HWND>& windowsToReceiveMessages)
56     {
57         return RunLoop::dispatchSentMessagesUntil(windowsToReceiveMessages, m_waitForSyncReplySemaphore, absoluteTime);
58     }
59 #endif
60
61     // Returns true if this message will be handled on a client thread that is currently
62     // waiting for a reply to a synchronous message.
63     bool processIncomingMessage(Connection*, IncomingMessage&);
64
65     void dispatchMessages();
66
67 private:
68     explicit SyncMessageState(RunLoop*);
69
70     typedef HashMap<RunLoop*, SyncMessageState*> SyncMessageStateMap;
71     static SyncMessageStateMap& syncMessageStateMap()
72     {
73         DEFINE_STATIC_LOCAL(SyncMessageStateMap, syncMessageStateMap, ());
74         return syncMessageStateMap;
75     }
76
77     static Mutex& syncMessageStateMapMutex()
78     {
79         DEFINE_STATIC_LOCAL(Mutex, syncMessageStateMapMutex, ());
80         return syncMessageStateMapMutex;
81     }
82
83     void dispatchMessageAndResetDidScheduleDispatchMessagesWork();
84
85     RunLoop* m_runLoop;
86     BinarySemaphore m_waitForSyncReplySemaphore;
87
88     // Protects m_didScheduleDispatchMessagesWork and m_messagesToDispatchWhileWaitingForSyncReply.
89     Mutex m_mutex;
90
91     bool m_didScheduleDispatchMessagesWork;
92
93     struct ConnectionAndIncomingMessage {
94         RefPtr<Connection> connection;
95         IncomingMessage incomingMessage;
96     };
97     Vector<ConnectionAndIncomingMessage> m_messagesToDispatchWhileWaitingForSyncReply;
98 };
99
100 PassRefPtr<Connection::SyncMessageState> Connection::SyncMessageState::getOrCreate(RunLoop* runLoop)
101 {
102     MutexLocker locker(syncMessageStateMapMutex());
103     pair<SyncMessageStateMap::iterator, bool> result = syncMessageStateMap().add(runLoop, 0);
104
105     if (!result.second) {
106         ASSERT(result.first->second);
107         return result.first->second;
108     }
109
110     RefPtr<SyncMessageState> syncMessageState = adoptRef(new SyncMessageState(runLoop));
111     result.first->second = syncMessageState.get();
112
113     return syncMessageState.release();
114 }
115
116 Connection::SyncMessageState::SyncMessageState(RunLoop* runLoop)
117     : m_runLoop(runLoop)
118     , m_didScheduleDispatchMessagesWork(false)
119 {
120 }
121
122 Connection::SyncMessageState::~SyncMessageState()
123 {
124     MutexLocker locker(syncMessageStateMapMutex());
125     
126     ASSERT(syncMessageStateMap().contains(m_runLoop));
127     syncMessageStateMap().remove(m_runLoop);
128 }
129
130 bool Connection::SyncMessageState::processIncomingMessage(Connection* connection, IncomingMessage& incomingMessage)
131 {
132     MessageID messageID = incomingMessage.messageID();
133     if (!messageID.shouldDispatchMessageWhenWaitingForSyncReply())
134         return false;
135
136     ConnectionAndIncomingMessage connectionAndIncomingMessage;
137     connectionAndIncomingMessage.connection = connection;
138     connectionAndIncomingMessage.incomingMessage = incomingMessage;
139
140     {
141         MutexLocker locker(m_mutex);
142         
143         if (!m_didScheduleDispatchMessagesWork) {
144             m_runLoop->scheduleWork(WorkItem::create(this, &SyncMessageState::dispatchMessageAndResetDidScheduleDispatchMessagesWork));
145             m_didScheduleDispatchMessagesWork = true;
146         }
147
148         m_messagesToDispatchWhileWaitingForSyncReply.append(connectionAndIncomingMessage);
149     }
150
151     wakeUpClientRunLoop();
152
153     return true;
154 }
155
156 void Connection::SyncMessageState::dispatchMessages()
157 {
158     ASSERT(m_runLoop == RunLoop::current());
159
160     Vector<ConnectionAndIncomingMessage> messagesToDispatchWhileWaitingForSyncReply;
161
162     {
163         MutexLocker locker(m_mutex);
164         m_messagesToDispatchWhileWaitingForSyncReply.swap(messagesToDispatchWhileWaitingForSyncReply);
165     }
166
167     for (size_t i = 0; i < messagesToDispatchWhileWaitingForSyncReply.size(); ++i) {
168         ConnectionAndIncomingMessage& connectionAndIncomingMessage = messagesToDispatchWhileWaitingForSyncReply[i];
169         connectionAndIncomingMessage.connection->dispatchMessage(connectionAndIncomingMessage.incomingMessage);
170     }
171 }
172
173 void Connection::SyncMessageState::dispatchMessageAndResetDidScheduleDispatchMessagesWork()
174 {
175     {
176         MutexLocker locker(m_mutex);
177         ASSERT(m_didScheduleDispatchMessagesWork);
178         m_didScheduleDispatchMessagesWork = false;
179     }
180
181     dispatchMessages();
182 }
183
184 PassRefPtr<Connection> Connection::createServerConnection(Identifier identifier, Client* client, RunLoop* clientRunLoop)
185 {
186     return adoptRef(new Connection(identifier, true, client, clientRunLoop));
187 }
188
189 PassRefPtr<Connection> Connection::createClientConnection(Identifier identifier, Client* client, RunLoop* clientRunLoop)
190 {
191     return adoptRef(new Connection(identifier, false, client, clientRunLoop));
192 }
193
194 Connection::Connection(Identifier identifier, bool isServer, Client* client, RunLoop* clientRunLoop)
195     : m_client(client)
196     , m_isServer(isServer)
197     , m_syncRequestID(0)
198     , m_onlySendMessagesAsDispatchWhenWaitingForSyncReplyWhenProcessingSuchAMessage(false)
199     , m_shouldExitOnSyncMessageSendFailure(false)
200     , m_didCloseOnConnectionWorkQueueCallback(0)
201     , m_isConnected(false)
202     , m_connectionQueue("com.apple.CoreIPC.ReceiveQueue")
203     , m_clientRunLoop(clientRunLoop)
204     , m_inDispatchMessageCount(0)
205     , m_inDispatchMessageMarkedDispatchWhenWaitingForSyncReplyCount(0)
206     , m_didReceiveInvalidMessage(false)
207     , m_defaultSyncMessageTimeout(NoTimeout)
208     , m_syncMessageState(SyncMessageState::getOrCreate(clientRunLoop))
209     , m_shouldWaitForSyncReplies(true)
210 {
211     ASSERT(m_client);
212
213     platformInitialize(identifier);
214 }
215
216 Connection::~Connection()
217 {
218     ASSERT(!isValid());
219
220     m_connectionQueue.invalidate();
221 }
222
223 void Connection::setOnlySendMessagesAsDispatchWhenWaitingForSyncReplyWhenProcessingSuchAMessage(bool flag)
224 {
225     ASSERT(!m_isConnected);
226
227     m_onlySendMessagesAsDispatchWhenWaitingForSyncReplyWhenProcessingSuchAMessage = flag;
228 }
229
230 void Connection::setShouldExitOnSyncMessageSendFailure(bool shouldExitOnSyncMessageSendFailure)
231 {
232     ASSERT(!m_isConnected);
233
234     m_shouldExitOnSyncMessageSendFailure = shouldExitOnSyncMessageSendFailure;
235 }
236
237 void Connection::addQueueClient(QueueClient* queueClient)
238 {
239     MutexLocker locker(m_connectionQueueClientsMutex);
240     ASSERT(!m_connectionQueueClients.contains(queueClient));
241
242     m_connectionQueueClients.append(queueClient);
243 }
244
245 void Connection::removeQueueClient(QueueClient* queueClient)
246 {
247     MutexLocker locker(m_connectionQueueClientsMutex);
248     size_t index = m_connectionQueueClients.find(queueClient);
249
250     ASSERT(index != notFound);
251     m_connectionQueueClients.remove(index);
252 }
253
254 void Connection::setDidCloseOnConnectionWorkQueueCallback(DidCloseOnConnectionWorkQueueCallback callback)
255 {
256     ASSERT(!m_isConnected);
257
258     m_didCloseOnConnectionWorkQueueCallback = callback;    
259 }
260
261 void Connection::invalidate()
262 {
263     if (!isValid()) {
264         // Someone already called invalidate().
265         return;
266     }
267     
268     // Reset the client.
269     m_client = 0;
270
271     m_connectionQueue.scheduleWork(WorkItem::create(this, &Connection::platformInvalidate));
272 }
273
274 void Connection::markCurrentlyDispatchedMessageAsInvalid()
275 {
276     // This should only be called while processing a message.
277     ASSERT(m_inDispatchMessageCount > 0);
278
279     m_didReceiveInvalidMessage = true;
280 }
281
282 void Connection::setDefaultSyncMessageTimeout(double defaultSyncMessageTimeout)
283 {
284     ASSERT(defaultSyncMessageTimeout != DefaultTimeout);
285
286     m_defaultSyncMessageTimeout = defaultSyncMessageTimeout;
287 }
288
289 PassOwnPtr<ArgumentEncoder> Connection::createSyncMessageArgumentEncoder(uint64_t destinationID, uint64_t& syncRequestID)
290 {
291     OwnPtr<ArgumentEncoder> argumentEncoder = ArgumentEncoder::create(destinationID);
292
293     // Encode the sync request ID.
294     syncRequestID = ++m_syncRequestID;
295     argumentEncoder->encode(syncRequestID);
296
297     return argumentEncoder.release();
298 }
299
300 bool Connection::sendMessage(MessageID messageID, PassOwnPtr<ArgumentEncoder> arguments, unsigned messageSendFlags)
301 {
302     if (!isValid())
303         return false;
304
305     if (messageSendFlags & DispatchMessageEvenWhenWaitingForSyncReply
306         && (!m_onlySendMessagesAsDispatchWhenWaitingForSyncReplyWhenProcessingSuchAMessage
307             || m_inDispatchMessageMarkedDispatchWhenWaitingForSyncReplyCount))
308         messageID = messageID.messageIDWithAddedFlags(MessageID::DispatchMessageWhenWaitingForSyncReply);
309
310     MutexLocker locker(m_outgoingMessagesLock);
311     m_outgoingMessages.append(OutgoingMessage(messageID, arguments));
312     
313     // FIXME: We should add a boolean flag so we don't call this when work has already been scheduled.
314     m_connectionQueue.scheduleWork(WorkItem::create(this, &Connection::sendOutgoingMessages));
315     return true;
316 }
317
318 bool Connection::sendSyncReply(PassOwnPtr<ArgumentEncoder> arguments)
319 {
320     return sendMessage(MessageID(CoreIPCMessage::SyncMessageReply), arguments);
321 }
322
323 PassOwnPtr<ArgumentDecoder> Connection::waitForMessage(MessageID messageID, uint64_t destinationID, double timeout)
324 {
325     // First, check if this message is already in the incoming messages queue.
326     {
327         MutexLocker locker(m_incomingMessagesLock);
328
329         for (size_t i = 0; i < m_incomingMessages.size(); ++i) {
330             IncomingMessage& message = m_incomingMessages[i];
331
332             if (message.messageID() == messageID && message.arguments()->destinationID() == destinationID) {
333                 OwnPtr<ArgumentDecoder> arguments = message.releaseArguments();
334
335                 // Erase the incoming message.
336                 m_incomingMessages.remove(i);
337                 return arguments.release();
338             }
339         }
340     }
341     
342     double absoluteTime = currentTime() + timeout;
343     
344     std::pair<unsigned, uint64_t> messageAndDestination(std::make_pair(messageID.toInt(), destinationID));
345     
346     {
347         MutexLocker locker(m_waitForMessageMutex);
348
349         // We don't support having multiple clients wait for the same message.
350         ASSERT(!m_waitForMessageMap.contains(messageAndDestination));
351     
352         // Insert our pending wait.
353         m_waitForMessageMap.set(messageAndDestination, 0);
354     }
355     
356     // Now wait for it to be set.
357     while (true) {
358         MutexLocker locker(m_waitForMessageMutex);
359
360         HashMap<std::pair<unsigned, uint64_t>, ArgumentDecoder*>::iterator it = m_waitForMessageMap.find(messageAndDestination);
361         if (it->second) {
362             // FIXME: m_waitForMessageMap should really hold OwnPtrs to
363             // ArgumentDecoders, but HashMap doesn't currently support OwnPtrs.
364             OwnPtr<ArgumentDecoder> arguments = adoptPtr(it->second);
365             m_waitForMessageMap.remove(it);
366             
367             return arguments.release();
368         }
369         
370         // Now we wait.
371         if (!m_waitForMessageCondition.timedWait(m_waitForMessageMutex, absoluteTime)) {
372             // We timed out, now remove the pending wait.
373             m_waitForMessageMap.remove(messageAndDestination);
374
375             break;
376         }
377     }
378     
379     return nullptr;
380 }
381
382 PassOwnPtr<ArgumentDecoder> Connection::sendSyncMessage(MessageID messageID, uint64_t syncRequestID, PassOwnPtr<ArgumentEncoder> encoder, double timeout)
383 {
384     // We only allow sending sync messages from the client run loop.
385     ASSERT(RunLoop::current() == m_clientRunLoop);
386
387     if (!isValid()) {
388         didFailToSendSyncMessage();
389         return nullptr;
390     }
391
392     // Push the pending sync reply information on our stack.
393     {
394         MutexLocker locker(m_syncReplyStateMutex);
395         if (!m_shouldWaitForSyncReplies) {
396             didFailToSendSyncMessage();
397             return nullptr;
398         }
399
400         m_pendingSyncReplies.append(PendingSyncReply(syncRequestID));
401     }
402
403     // First send the message.
404     sendMessage(messageID.messageIDWithAddedFlags(MessageID::SyncMessage), encoder, DispatchMessageEvenWhenWaitingForSyncReply);
405
406     // Then wait for a reply. Waiting for a reply could involve dispatching incoming sync messages, so
407     // keep an extra reference to the connection here in case it's invalidated.
408     RefPtr<Connection> protect(this);
409     OwnPtr<ArgumentDecoder> reply = waitForSyncReply(syncRequestID, timeout);
410
411     // Finally, pop the pending sync reply information.
412     {
413         MutexLocker locker(m_syncReplyStateMutex);
414         ASSERT(m_pendingSyncReplies.last().syncRequestID == syncRequestID);
415         m_pendingSyncReplies.removeLast();
416     }
417
418     if (!reply)
419         didFailToSendSyncMessage();
420
421     return reply.release();
422 }
423
424 PassOwnPtr<ArgumentDecoder> Connection::waitForSyncReply(uint64_t syncRequestID, double timeout)
425 {
426     if (timeout == DefaultTimeout)
427         timeout = m_defaultSyncMessageTimeout;
428
429     // Use a really long timeout.
430     if (timeout == NoTimeout)
431         timeout = 1e10;
432
433     double absoluteTime = currentTime() + timeout;
434
435     bool timedOut = false;
436     while (!timedOut) {
437         // First, check if we have any messages that we need to process.
438         m_syncMessageState->dispatchMessages();
439         
440         {
441             MutexLocker locker(m_syncReplyStateMutex);
442
443             // Second, check if there is a sync reply at the top of the stack.
444             ASSERT(!m_pendingSyncReplies.isEmpty());
445             
446             PendingSyncReply& pendingSyncReply = m_pendingSyncReplies.last();
447             ASSERT(pendingSyncReply.syncRequestID == syncRequestID);
448             
449             // We found the sync reply, or the connection was closed.
450             if (pendingSyncReply.didReceiveReply || !m_shouldWaitForSyncReplies)
451                 return pendingSyncReply.releaseReplyDecoder();
452         }
453
454         // Processing a sync message could cause the connection to be invalidated.
455         // (If the handler ends up calling Connection::invalidate).
456         // If that happens, we need to stop waiting, or we'll hang since we won't get
457         // any more incoming messages.
458         if (!isValid())
459             return nullptr;
460
461         // We didn't find a sync reply yet, keep waiting.
462 #if PLATFORM(WIN)
463         timedOut = !m_syncMessageState->waitWhileDispatchingSentWin32Messages(absoluteTime, m_client->windowsToReceiveSentMessagesWhileWaitingForSyncReply());
464 #else
465         timedOut = !m_syncMessageState->wait(absoluteTime);
466 #endif
467     }
468
469     // We timed out.
470     if (m_client)
471         m_client->syncMessageSendTimedOut(this);
472
473     return nullptr;
474 }
475
476 void Connection::processIncomingSyncReply(PassOwnPtr<ArgumentDecoder> arguments)
477 {
478     MutexLocker locker(m_syncReplyStateMutex);
479
480     // Go through the stack of sync requests that have pending replies and see which one
481     // this reply is for.
482     for (size_t i = m_pendingSyncReplies.size(); i > 0; --i) {
483         PendingSyncReply& pendingSyncReply = m_pendingSyncReplies[i - 1];
484
485         if (pendingSyncReply.syncRequestID != arguments->destinationID())
486             continue;
487
488         ASSERT(!pendingSyncReply.replyDecoder);
489
490         pendingSyncReply.replyDecoder = arguments.leakPtr();
491         pendingSyncReply.didReceiveReply = true;
492
493         // We got a reply to the last send message, wake up the client run loop so it can be processed.
494         if (i == m_pendingSyncReplies.size())
495             m_syncMessageState->wakeUpClientRunLoop();
496
497         return;
498     }
499
500     // If we get here, it means we got a reply for a message that wasn't in the sync request stack.
501     // This can happen if the send timed out, so it's fine to ignore.
502 }
503
504 void Connection::processIncomingMessage(MessageID messageID, PassOwnPtr<ArgumentDecoder> arguments)
505 {
506     // Check if this is a sync reply.
507     if (messageID == MessageID(CoreIPCMessage::SyncMessageReply)) {
508         processIncomingSyncReply(arguments);
509         return;
510     }
511
512     IncomingMessage incomingMessage(messageID, arguments);
513
514     // Check if this is a sync message or if it's a message that should be dispatched even when waiting for
515     // a sync reply. If it is, and we're waiting for a sync reply this message needs to be dispatched.
516     // If we don't we'll end up with a deadlock where both sync message senders are stuck waiting for a reply.
517     if (m_syncMessageState->processIncomingMessage(this, incomingMessage))
518         return;
519
520     // Check if we're waiting for this message.
521     {
522         MutexLocker locker(m_waitForMessageMutex);
523         
524         HashMap<std::pair<unsigned, uint64_t>, ArgumentDecoder*>::iterator it = m_waitForMessageMap.find(std::make_pair(messageID.toInt(), incomingMessage.destinationID()));
525         if (it != m_waitForMessageMap.end()) {
526             it->second = incomingMessage.releaseArguments().leakPtr();
527             ASSERT(it->second);
528         
529             m_waitForMessageCondition.signal();
530             return;
531         }
532     }
533
534     // Hand off the message to the connection queue clients.
535     {
536         MutexLocker locker(m_connectionQueueClientsMutex);
537
538         for (size_t i = 0; i < m_connectionQueueClients.size(); ++i) {
539             if (!m_connectionQueueClients[i]->willProcessMessageOnClientRunLoop(this, incomingMessage.messageID(), incomingMessage.arguments())) {
540                 // A connection queue client handled the message, our work here is done.
541                 incomingMessage.releaseArguments();
542                 return;
543             }
544         }
545     }
546
547     enqueueIncomingMessage(incomingMessage);
548 }
549
550 void Connection::postConnectionDidCloseOnConnectionWorkQueue()
551 {
552     m_connectionQueue.scheduleWork(WorkItem::create(this, &Connection::connectionDidClose));
553 }
554
555 void Connection::connectionDidClose()
556 {
557     // The connection is now invalid.
558     platformInvalidate();
559
560     {
561         MutexLocker locker(m_syncReplyStateMutex);
562
563         ASSERT(m_shouldWaitForSyncReplies);
564         m_shouldWaitForSyncReplies = false;
565
566         if (!m_pendingSyncReplies.isEmpty())
567             m_syncMessageState->wakeUpClientRunLoop();
568     }
569
570     if (m_didCloseOnConnectionWorkQueueCallback)
571         m_didCloseOnConnectionWorkQueueCallback(m_connectionQueue, this);
572
573     m_clientRunLoop->scheduleWork(WorkItem::create(this, &Connection::dispatchConnectionDidClose));
574 }
575
576 void Connection::dispatchConnectionDidClose()
577 {
578     // If the connection has been explicitly invalidated before dispatchConnectionDidClose was called,
579     // then the client will be null here.
580     if (!m_client)
581         return;
582
583
584     // Because we define a connection as being "valid" based on wheter it has a null client, we null out
585     // the client before calling didClose here. Otherwise, sendSync will try to send a message to the connection and
586     // will then wait indefinitely for a reply.
587     Client* client = m_client;
588     m_client = 0;
589     
590     client->didClose(this);
591 }
592
593 bool Connection::canSendOutgoingMessages() const
594 {
595     return m_isConnected && platformCanSendOutgoingMessages();
596 }
597
598 void Connection::sendOutgoingMessages()
599 {
600     if (!canSendOutgoingMessages())
601         return;
602
603     while (true) {
604         OutgoingMessage message;
605         {
606             MutexLocker locker(m_outgoingMessagesLock);
607             if (m_outgoingMessages.isEmpty())
608                 break;
609             message = m_outgoingMessages.takeFirst();
610         }
611
612         if (!sendOutgoingMessage(message.messageID(), adoptPtr(message.arguments())))
613             break;
614     }
615 }
616
617 void Connection::dispatchSyncMessage(MessageID messageID, ArgumentDecoder* arguments)
618 {
619     ASSERT(messageID.isSync());
620
621     uint64_t syncRequestID = 0;
622     if (!arguments->decodeUInt64(syncRequestID) || !syncRequestID) {
623         // We received an invalid sync message.
624         arguments->markInvalid();
625         return;
626     }
627
628     OwnPtr<ArgumentEncoder> replyEncoder = ArgumentEncoder::create(syncRequestID);
629
630     // Hand off both the decoder and encoder to the client.
631     m_client->didReceiveSyncMessage(this, messageID, arguments, replyEncoder);
632
633     // FIXME: If the message was invalid, we should send back a SyncMessageError.
634     ASSERT(!arguments->isInvalid());
635
636     if (replyEncoder)
637         sendSyncReply(replyEncoder.release());
638 }
639
640 void Connection::didFailToSendSyncMessage()
641 {
642     if (!m_shouldExitOnSyncMessageSendFailure)
643         return;
644
645     exit(0);
646 }
647
648 void Connection::enqueueIncomingMessage(IncomingMessage& incomingMessage)
649 {
650     MutexLocker locker(m_incomingMessagesLock);
651     m_incomingMessages.append(incomingMessage);
652
653     m_clientRunLoop->scheduleWork(WorkItem::create(this, &Connection::dispatchMessages));
654 }
655
656 void Connection::dispatchMessage(IncomingMessage& message)
657 {
658     OwnPtr<ArgumentDecoder> arguments = message.releaseArguments();
659
660     // If there's no client, return. We do this after calling releaseArguments so that
661     // the ArgumentDecoder message will be freed.
662     if (!m_client)
663         return;
664
665     m_inDispatchMessageCount++;
666
667     if (message.messageID().shouldDispatchMessageWhenWaitingForSyncReply())
668         m_inDispatchMessageMarkedDispatchWhenWaitingForSyncReplyCount++;
669
670     bool oldDidReceiveInvalidMessage = m_didReceiveInvalidMessage;
671     m_didReceiveInvalidMessage = false;
672
673     if (message.messageID().isSync())
674         dispatchSyncMessage(message.messageID(), arguments.get());
675     else
676         m_client->didReceiveMessage(this, message.messageID(), arguments.get());
677
678     m_didReceiveInvalidMessage |= arguments->isInvalid();
679     m_inDispatchMessageCount--;
680
681     if (message.messageID().shouldDispatchMessageWhenWaitingForSyncReply())
682         m_inDispatchMessageMarkedDispatchWhenWaitingForSyncReplyCount--;
683
684     if (m_didReceiveInvalidMessage && m_client)
685         m_client->didReceiveInvalidMessage(this, message.messageID());
686
687     m_didReceiveInvalidMessage = oldDidReceiveInvalidMessage;
688 }
689
690 void Connection::dispatchMessages()
691 {
692     Vector<IncomingMessage> incomingMessages;
693     
694     {
695         MutexLocker locker(m_incomingMessagesLock);
696         m_incomingMessages.swap(incomingMessages);
697     }
698
699     for (size_t i = 0; i < incomingMessages.size(); ++i)
700         dispatchMessage(incomingMessages[i]);
701 }
702
703 } // namespace CoreIPC