2 * Copyright (C) 2010 Apple Inc. All rights reserved.
4 * Redistribution and use in source and binary forms, with or without
5 * modification, are permitted provided that the following conditions
7 * 1. Redistributions of source code must retain the above copyright
8 * notice, this list of conditions and the following disclaimer.
9 * 2. Redistributions in binary form must reproduce the above copyright
10 * notice, this list of conditions and the following disclaimer in the
11 * documentation and/or other materials provided with the distribution.
13 * THIS SOFTWARE IS PROVIDED BY APPLE INC. AND ITS CONTRIBUTORS ``AS IS''
14 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
15 * THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
16 * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL APPLE INC. OR ITS CONTRIBUTORS
17 * BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
18 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
19 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
20 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
21 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
22 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF
23 * THE POSSIBILITY OF SUCH DAMAGE.
27 #include "Connection.h"
29 #include "BinarySemaphore.h"
30 #include "CoreIPCMessageKinds.h"
33 #include <wtf/CurrentTime.h>
39 class Connection::SyncMessageState : public ThreadSafeRefCounted<Connection::SyncMessageState> {
41 static PassRefPtr<SyncMessageState> getOrCreate(RunLoop*);
44 void wakeUpClientRunLoop()
46 m_waitForSyncReplySemaphore.signal();
49 bool wait(double absoluteTime)
51 return m_waitForSyncReplySemaphore.wait(absoluteTime);
55 bool waitWhileDispatchingSentWin32Messages(double absoluteTime, const Vector<HWND>& windowsToReceiveMessages)
57 return RunLoop::dispatchSentMessagesUntil(windowsToReceiveMessages, m_waitForSyncReplySemaphore, absoluteTime);
61 // Returns true if this message will be handled on a client thread that is currently
62 // waiting for a reply to a synchronous message.
63 bool processIncomingMessage(Connection*, IncomingMessage&);
65 void dispatchMessages();
68 explicit SyncMessageState(RunLoop*);
70 typedef HashMap<RunLoop*, SyncMessageState*> SyncMessageStateMap;
71 static SyncMessageStateMap& syncMessageStateMap()
73 DEFINE_STATIC_LOCAL(SyncMessageStateMap, syncMessageStateMap, ());
74 return syncMessageStateMap;
77 static Mutex& syncMessageStateMapMutex()
79 DEFINE_STATIC_LOCAL(Mutex, syncMessageStateMapMutex, ());
80 return syncMessageStateMapMutex;
83 void dispatchMessageAndResetDidScheduleDispatchMessagesWork();
86 BinarySemaphore m_waitForSyncReplySemaphore;
88 // Protects m_didScheduleDispatchMessagesWork and m_messagesToDispatchWhileWaitingForSyncReply.
91 bool m_didScheduleDispatchMessagesWork;
93 struct ConnectionAndIncomingMessage {
94 RefPtr<Connection> connection;
95 IncomingMessage incomingMessage;
97 Vector<ConnectionAndIncomingMessage> m_messagesToDispatchWhileWaitingForSyncReply;
100 PassRefPtr<Connection::SyncMessageState> Connection::SyncMessageState::getOrCreate(RunLoop* runLoop)
102 MutexLocker locker(syncMessageStateMapMutex());
103 pair<SyncMessageStateMap::iterator, bool> result = syncMessageStateMap().add(runLoop, 0);
105 if (!result.second) {
106 ASSERT(result.first->second);
107 return result.first->second;
110 RefPtr<SyncMessageState> syncMessageState = adoptRef(new SyncMessageState(runLoop));
111 result.first->second = syncMessageState.get();
113 return syncMessageState.release();
116 Connection::SyncMessageState::SyncMessageState(RunLoop* runLoop)
118 , m_didScheduleDispatchMessagesWork(false)
122 Connection::SyncMessageState::~SyncMessageState()
124 MutexLocker locker(syncMessageStateMapMutex());
126 ASSERT(syncMessageStateMap().contains(m_runLoop));
127 syncMessageStateMap().remove(m_runLoop);
130 bool Connection::SyncMessageState::processIncomingMessage(Connection* connection, IncomingMessage& incomingMessage)
132 MessageID messageID = incomingMessage.messageID();
133 if (!messageID.shouldDispatchMessageWhenWaitingForSyncReply())
136 ConnectionAndIncomingMessage connectionAndIncomingMessage;
137 connectionAndIncomingMessage.connection = connection;
138 connectionAndIncomingMessage.incomingMessage = incomingMessage;
141 MutexLocker locker(m_mutex);
143 if (!m_didScheduleDispatchMessagesWork) {
144 m_runLoop->scheduleWork(WorkItem::create(this, &SyncMessageState::dispatchMessageAndResetDidScheduleDispatchMessagesWork));
145 m_didScheduleDispatchMessagesWork = true;
148 m_messagesToDispatchWhileWaitingForSyncReply.append(connectionAndIncomingMessage);
151 wakeUpClientRunLoop();
156 void Connection::SyncMessageState::dispatchMessages()
158 ASSERT(m_runLoop == RunLoop::current());
160 Vector<ConnectionAndIncomingMessage> messagesToDispatchWhileWaitingForSyncReply;
163 MutexLocker locker(m_mutex);
164 m_messagesToDispatchWhileWaitingForSyncReply.swap(messagesToDispatchWhileWaitingForSyncReply);
167 for (size_t i = 0; i < messagesToDispatchWhileWaitingForSyncReply.size(); ++i) {
168 ConnectionAndIncomingMessage& connectionAndIncomingMessage = messagesToDispatchWhileWaitingForSyncReply[i];
169 connectionAndIncomingMessage.connection->dispatchMessage(connectionAndIncomingMessage.incomingMessage);
173 void Connection::SyncMessageState::dispatchMessageAndResetDidScheduleDispatchMessagesWork()
176 MutexLocker locker(m_mutex);
177 ASSERT(m_didScheduleDispatchMessagesWork);
178 m_didScheduleDispatchMessagesWork = false;
184 PassRefPtr<Connection> Connection::createServerConnection(Identifier identifier, Client* client, RunLoop* clientRunLoop)
186 return adoptRef(new Connection(identifier, true, client, clientRunLoop));
189 PassRefPtr<Connection> Connection::createClientConnection(Identifier identifier, Client* client, RunLoop* clientRunLoop)
191 return adoptRef(new Connection(identifier, false, client, clientRunLoop));
194 Connection::Connection(Identifier identifier, bool isServer, Client* client, RunLoop* clientRunLoop)
196 , m_isServer(isServer)
198 , m_onlySendMessagesAsDispatchWhenWaitingForSyncReplyWhenProcessingSuchAMessage(false)
199 , m_shouldExitOnSyncMessageSendFailure(false)
200 , m_didCloseOnConnectionWorkQueueCallback(0)
201 , m_isConnected(false)
202 , m_connectionQueue("com.apple.CoreIPC.ReceiveQueue")
203 , m_clientRunLoop(clientRunLoop)
204 , m_inDispatchMessageCount(0)
205 , m_inDispatchMessageMarkedDispatchWhenWaitingForSyncReplyCount(0)
206 , m_didReceiveInvalidMessage(false)
207 , m_defaultSyncMessageTimeout(NoTimeout)
208 , m_syncMessageState(SyncMessageState::getOrCreate(clientRunLoop))
209 , m_shouldWaitForSyncReplies(true)
213 platformInitialize(identifier);
216 Connection::~Connection()
220 m_connectionQueue.invalidate();
223 void Connection::setOnlySendMessagesAsDispatchWhenWaitingForSyncReplyWhenProcessingSuchAMessage(bool flag)
225 ASSERT(!m_isConnected);
227 m_onlySendMessagesAsDispatchWhenWaitingForSyncReplyWhenProcessingSuchAMessage = flag;
230 void Connection::setShouldExitOnSyncMessageSendFailure(bool shouldExitOnSyncMessageSendFailure)
232 ASSERT(!m_isConnected);
234 m_shouldExitOnSyncMessageSendFailure = shouldExitOnSyncMessageSendFailure;
237 void Connection::addQueueClient(QueueClient* queueClient)
239 MutexLocker locker(m_connectionQueueClientsMutex);
240 ASSERT(!m_connectionQueueClients.contains(queueClient));
242 m_connectionQueueClients.append(queueClient);
245 void Connection::removeQueueClient(QueueClient* queueClient)
247 MutexLocker locker(m_connectionQueueClientsMutex);
248 size_t index = m_connectionQueueClients.find(queueClient);
250 ASSERT(index != notFound);
251 m_connectionQueueClients.remove(index);
254 void Connection::setDidCloseOnConnectionWorkQueueCallback(DidCloseOnConnectionWorkQueueCallback callback)
256 ASSERT(!m_isConnected);
258 m_didCloseOnConnectionWorkQueueCallback = callback;
261 void Connection::invalidate()
264 // Someone already called invalidate().
271 m_connectionQueue.scheduleWork(WorkItem::create(this, &Connection::platformInvalidate));
274 void Connection::markCurrentlyDispatchedMessageAsInvalid()
276 // This should only be called while processing a message.
277 ASSERT(m_inDispatchMessageCount > 0);
279 m_didReceiveInvalidMessage = true;
282 void Connection::setDefaultSyncMessageTimeout(double defaultSyncMessageTimeout)
284 ASSERT(defaultSyncMessageTimeout != DefaultTimeout);
286 m_defaultSyncMessageTimeout = defaultSyncMessageTimeout;
289 PassOwnPtr<ArgumentEncoder> Connection::createSyncMessageArgumentEncoder(uint64_t destinationID, uint64_t& syncRequestID)
291 OwnPtr<ArgumentEncoder> argumentEncoder = ArgumentEncoder::create(destinationID);
293 // Encode the sync request ID.
294 syncRequestID = ++m_syncRequestID;
295 argumentEncoder->encode(syncRequestID);
297 return argumentEncoder.release();
300 bool Connection::sendMessage(MessageID messageID, PassOwnPtr<ArgumentEncoder> arguments, unsigned messageSendFlags)
305 if (messageSendFlags & DispatchMessageEvenWhenWaitingForSyncReply
306 && (!m_onlySendMessagesAsDispatchWhenWaitingForSyncReplyWhenProcessingSuchAMessage
307 || m_inDispatchMessageMarkedDispatchWhenWaitingForSyncReplyCount))
308 messageID = messageID.messageIDWithAddedFlags(MessageID::DispatchMessageWhenWaitingForSyncReply);
310 MutexLocker locker(m_outgoingMessagesLock);
311 m_outgoingMessages.append(OutgoingMessage(messageID, arguments));
313 // FIXME: We should add a boolean flag so we don't call this when work has already been scheduled.
314 m_connectionQueue.scheduleWork(WorkItem::create(this, &Connection::sendOutgoingMessages));
318 bool Connection::sendSyncReply(PassOwnPtr<ArgumentEncoder> arguments)
320 return sendMessage(MessageID(CoreIPCMessage::SyncMessageReply), arguments);
323 PassOwnPtr<ArgumentDecoder> Connection::waitForMessage(MessageID messageID, uint64_t destinationID, double timeout)
325 // First, check if this message is already in the incoming messages queue.
327 MutexLocker locker(m_incomingMessagesLock);
329 for (Deque<IncomingMessage>::iterator it = m_incomingMessages.begin(), end = m_incomingMessages.end(); it != end; ++it) {
330 IncomingMessage& message = *it;
332 if (message.messageID() == messageID && message.arguments()->destinationID() == destinationID) {
333 OwnPtr<ArgumentDecoder> arguments = message.releaseArguments();
335 m_incomingMessages.remove(it);
336 return arguments.release();
341 double absoluteTime = currentTime() + timeout;
343 std::pair<unsigned, uint64_t> messageAndDestination(std::make_pair(messageID.toInt(), destinationID));
346 MutexLocker locker(m_waitForMessageMutex);
348 // We don't support having multiple clients wait for the same message.
349 ASSERT(!m_waitForMessageMap.contains(messageAndDestination));
351 // Insert our pending wait.
352 m_waitForMessageMap.set(messageAndDestination, 0);
355 // Now wait for it to be set.
357 MutexLocker locker(m_waitForMessageMutex);
359 HashMap<std::pair<unsigned, uint64_t>, ArgumentDecoder*>::iterator it = m_waitForMessageMap.find(messageAndDestination);
361 // FIXME: m_waitForMessageMap should really hold OwnPtrs to
362 // ArgumentDecoders, but HashMap doesn't currently support OwnPtrs.
363 OwnPtr<ArgumentDecoder> arguments = adoptPtr(it->second);
364 m_waitForMessageMap.remove(it);
366 return arguments.release();
370 if (!m_waitForMessageCondition.timedWait(m_waitForMessageMutex, absoluteTime)) {
371 // We timed out, now remove the pending wait.
372 m_waitForMessageMap.remove(messageAndDestination);
381 PassOwnPtr<ArgumentDecoder> Connection::sendSyncMessage(MessageID messageID, uint64_t syncRequestID, PassOwnPtr<ArgumentEncoder> encoder, double timeout)
383 // We only allow sending sync messages from the client run loop.
384 ASSERT(RunLoop::current() == m_clientRunLoop);
387 didFailToSendSyncMessage();
391 // Push the pending sync reply information on our stack.
393 MutexLocker locker(m_syncReplyStateMutex);
394 if (!m_shouldWaitForSyncReplies) {
395 didFailToSendSyncMessage();
399 m_pendingSyncReplies.append(PendingSyncReply(syncRequestID));
402 // First send the message.
403 sendMessage(messageID.messageIDWithAddedFlags(MessageID::SyncMessage), encoder, DispatchMessageEvenWhenWaitingForSyncReply);
405 // Then wait for a reply. Waiting for a reply could involve dispatching incoming sync messages, so
406 // keep an extra reference to the connection here in case it's invalidated.
407 RefPtr<Connection> protect(this);
408 OwnPtr<ArgumentDecoder> reply = waitForSyncReply(syncRequestID, timeout);
410 // Finally, pop the pending sync reply information.
412 MutexLocker locker(m_syncReplyStateMutex);
413 ASSERT(m_pendingSyncReplies.last().syncRequestID == syncRequestID);
414 m_pendingSyncReplies.removeLast();
418 didFailToSendSyncMessage();
420 return reply.release();
423 PassOwnPtr<ArgumentDecoder> Connection::waitForSyncReply(uint64_t syncRequestID, double timeout)
425 if (timeout == DefaultTimeout)
426 timeout = m_defaultSyncMessageTimeout;
428 // Use a really long timeout.
429 if (timeout == NoTimeout)
432 double absoluteTime = currentTime() + timeout;
434 bool timedOut = false;
436 // First, check if we have any messages that we need to process.
437 m_syncMessageState->dispatchMessages();
440 MutexLocker locker(m_syncReplyStateMutex);
442 // Second, check if there is a sync reply at the top of the stack.
443 ASSERT(!m_pendingSyncReplies.isEmpty());
445 PendingSyncReply& pendingSyncReply = m_pendingSyncReplies.last();
446 ASSERT(pendingSyncReply.syncRequestID == syncRequestID);
448 // We found the sync reply, or the connection was closed.
449 if (pendingSyncReply.didReceiveReply || !m_shouldWaitForSyncReplies)
450 return pendingSyncReply.releaseReplyDecoder();
453 // Processing a sync message could cause the connection to be invalidated.
454 // (If the handler ends up calling Connection::invalidate).
455 // If that happens, we need to stop waiting, or we'll hang since we won't get
456 // any more incoming messages.
460 // We didn't find a sync reply yet, keep waiting.
462 timedOut = !m_syncMessageState->waitWhileDispatchingSentWin32Messages(absoluteTime, m_client->windowsToReceiveSentMessagesWhileWaitingForSyncReply());
464 timedOut = !m_syncMessageState->wait(absoluteTime);
470 m_client->syncMessageSendTimedOut(this);
475 void Connection::processIncomingSyncReply(PassOwnPtr<ArgumentDecoder> arguments)
477 MutexLocker locker(m_syncReplyStateMutex);
479 // Go through the stack of sync requests that have pending replies and see which one
480 // this reply is for.
481 for (size_t i = m_pendingSyncReplies.size(); i > 0; --i) {
482 PendingSyncReply& pendingSyncReply = m_pendingSyncReplies[i - 1];
484 if (pendingSyncReply.syncRequestID != arguments->destinationID())
487 ASSERT(!pendingSyncReply.replyDecoder);
489 pendingSyncReply.replyDecoder = arguments.leakPtr();
490 pendingSyncReply.didReceiveReply = true;
492 // We got a reply to the last send message, wake up the client run loop so it can be processed.
493 if (i == m_pendingSyncReplies.size())
494 m_syncMessageState->wakeUpClientRunLoop();
499 // If we get here, it means we got a reply for a message that wasn't in the sync request stack.
500 // This can happen if the send timed out, so it's fine to ignore.
503 void Connection::processIncomingMessage(MessageID messageID, PassOwnPtr<ArgumentDecoder> arguments)
505 // Check if this is a sync reply.
506 if (messageID == MessageID(CoreIPCMessage::SyncMessageReply)) {
507 processIncomingSyncReply(arguments);
511 IncomingMessage incomingMessage(messageID, arguments);
513 // Check if this is a sync message or if it's a message that should be dispatched even when waiting for
514 // a sync reply. If it is, and we're waiting for a sync reply this message needs to be dispatched.
515 // If we don't we'll end up with a deadlock where both sync message senders are stuck waiting for a reply.
516 if (m_syncMessageState->processIncomingMessage(this, incomingMessage))
519 // Check if we're waiting for this message.
521 MutexLocker locker(m_waitForMessageMutex);
523 HashMap<std::pair<unsigned, uint64_t>, ArgumentDecoder*>::iterator it = m_waitForMessageMap.find(std::make_pair(messageID.toInt(), incomingMessage.destinationID()));
524 if (it != m_waitForMessageMap.end()) {
525 it->second = incomingMessage.releaseArguments().leakPtr();
528 m_waitForMessageCondition.signal();
533 // Hand off the message to the connection queue clients.
535 MutexLocker locker(m_connectionQueueClientsMutex);
537 for (size_t i = 0; i < m_connectionQueueClients.size(); ++i) {
538 if (!m_connectionQueueClients[i]->willProcessMessageOnClientRunLoop(this, incomingMessage.messageID(), incomingMessage.arguments())) {
539 // A connection queue client handled the message, our work here is done.
540 incomingMessage.releaseArguments();
546 enqueueIncomingMessage(incomingMessage);
549 void Connection::postConnectionDidCloseOnConnectionWorkQueue()
551 m_connectionQueue.scheduleWork(WorkItem::create(this, &Connection::connectionDidClose));
554 void Connection::connectionDidClose()
556 // The connection is now invalid.
557 platformInvalidate();
560 MutexLocker locker(m_syncReplyStateMutex);
562 ASSERT(m_shouldWaitForSyncReplies);
563 m_shouldWaitForSyncReplies = false;
565 if (!m_pendingSyncReplies.isEmpty())
566 m_syncMessageState->wakeUpClientRunLoop();
569 if (m_didCloseOnConnectionWorkQueueCallback)
570 m_didCloseOnConnectionWorkQueueCallback(m_connectionQueue, this);
572 m_clientRunLoop->scheduleWork(WorkItem::create(this, &Connection::dispatchConnectionDidClose));
575 void Connection::dispatchConnectionDidClose()
577 // If the connection has been explicitly invalidated before dispatchConnectionDidClose was called,
578 // then the client will be null here.
583 // Because we define a connection as being "valid" based on wheter it has a null client, we null out
584 // the client before calling didClose here. Otherwise, sendSync will try to send a message to the connection and
585 // will then wait indefinitely for a reply.
586 Client* client = m_client;
589 client->didClose(this);
592 bool Connection::canSendOutgoingMessages() const
594 return m_isConnected && platformCanSendOutgoingMessages();
597 void Connection::sendOutgoingMessages()
599 if (!canSendOutgoingMessages())
603 OutgoingMessage message;
605 MutexLocker locker(m_outgoingMessagesLock);
606 if (m_outgoingMessages.isEmpty())
608 message = m_outgoingMessages.takeFirst();
611 if (!sendOutgoingMessage(message.messageID(), adoptPtr(message.arguments())))
616 void Connection::dispatchSyncMessage(MessageID messageID, ArgumentDecoder* arguments)
618 ASSERT(messageID.isSync());
620 uint64_t syncRequestID = 0;
621 if (!arguments->decodeUInt64(syncRequestID) || !syncRequestID) {
622 // We received an invalid sync message.
623 arguments->markInvalid();
627 OwnPtr<ArgumentEncoder> replyEncoder = ArgumentEncoder::create(syncRequestID);
629 // Hand off both the decoder and encoder to the client.
630 m_client->didReceiveSyncMessage(this, messageID, arguments, replyEncoder);
632 // FIXME: If the message was invalid, we should send back a SyncMessageError.
633 ASSERT(!arguments->isInvalid());
636 sendSyncReply(replyEncoder.release());
639 void Connection::didFailToSendSyncMessage()
641 if (!m_shouldExitOnSyncMessageSendFailure)
647 void Connection::enqueueIncomingMessage(IncomingMessage& incomingMessage)
649 MutexLocker locker(m_incomingMessagesLock);
650 m_incomingMessages.append(incomingMessage);
652 m_clientRunLoop->scheduleWork(WorkItem::create(this, &Connection::dispatchMessages));
655 void Connection::dispatchMessage(IncomingMessage& message)
657 OwnPtr<ArgumentDecoder> arguments = message.releaseArguments();
659 // If there's no client, return. We do this after calling releaseArguments so that
660 // the ArgumentDecoder message will be freed.
664 m_inDispatchMessageCount++;
666 if (message.messageID().shouldDispatchMessageWhenWaitingForSyncReply())
667 m_inDispatchMessageMarkedDispatchWhenWaitingForSyncReplyCount++;
669 bool oldDidReceiveInvalidMessage = m_didReceiveInvalidMessage;
670 m_didReceiveInvalidMessage = false;
672 if (message.messageID().isSync())
673 dispatchSyncMessage(message.messageID(), arguments.get());
675 m_client->didReceiveMessage(this, message.messageID(), arguments.get());
677 m_didReceiveInvalidMessage |= arguments->isInvalid();
678 m_inDispatchMessageCount--;
680 if (message.messageID().shouldDispatchMessageWhenWaitingForSyncReply())
681 m_inDispatchMessageMarkedDispatchWhenWaitingForSyncReplyCount--;
683 if (m_didReceiveInvalidMessage && m_client)
684 m_client->didReceiveInvalidMessage(this, message.messageID());
686 m_didReceiveInvalidMessage = oldDidReceiveInvalidMessage;
689 void Connection::dispatchMessages()
692 IncomingMessage incomingMessage;
695 MutexLocker locker(m_incomingMessagesLock);
696 if (m_incomingMessages.isEmpty())
699 incomingMessage = m_incomingMessages.takeFirst();
702 dispatchMessage(incomingMessage);
706 } // namespace CoreIPC