2011-02-11 Anders Carlsson <andersca@apple.com>
authorandersca@apple.com <andersca@apple.com@268f45cc-cd09-0410-ab3c-d52691b4dbfc>
Sat, 12 Feb 2011 01:45:24 +0000 (01:45 +0000)
committerandersca@apple.com <andersca@apple.com@268f45cc-cd09-0410-ab3c-d52691b4dbfc>
Sat, 12 Feb 2011 01:45:24 +0000 (01:45 +0000)
        Reviewed by Sam Weinig.

        Incoming synchronous messages should always be processed regardless of connection waiting for a reply
        https://bugs.webkit.org/show_bug.cgi?id=54326

        * Platform/CoreIPC/Connection.cpp:
        (CoreIPC::Connection::SyncMessageState::getOrCreate):
        Assert that the map has a non-null SyncMessageState object.

        (CoreIPC::Connection::SyncMessageState::SyncMessageState):
        Initialize m_waitForSyncReplyCount.

        (CoreIPC::Connection::SyncMessageState::beginWaitForSyncReply):
        Increment m_waitForSyncReplyCount.

        (CoreIPC::Connection::SyncMessageState::endWaitForSyncReply):
        Decrement m_waitForSyncReplyCount. If it's 0, enqueue any incoming sync messages.

        (CoreIPC::Connection::SyncMessageState::processIncomingMessage):
        If this is a message that needs to be dispatched, add it to the queue and wake up the client run loop.

        (CoreIPC::Connection::SyncMessageState::dispatchMessages):
        Go through the queue of incoming messages and dispatch them.

        (CoreIPC::Connection::sendSyncMessage):
        call beginWaitForSyncReply/endWaitForSyncReply.

        (CoreIPC::Connection::waitForSyncReply):
        Dispatch messages.

        (CoreIPC::Connection::processIncomingMessage):
        Call SyncMessageState::processIncomingMessage.

        (CoreIPC::Connection::enqueueIncomingMessage):
        Add helper function for enqueuing an incoming message.

        * Platform/CoreIPC/Connection.h:
        (CoreIPC::Connection::Message::destinationID):

git-svn-id: https://svn.webkit.org/repository/webkit/trunk@78399 268f45cc-cd09-0410-ab3c-d52691b4dbfc

Source/WebKit2/ChangeLog
Source/WebKit2/Platform/CoreIPC/Connection.cpp
Source/WebKit2/Platform/CoreIPC/Connection.h

index d8ba363..2d5cba4 100644 (file)
@@ -1,3 +1,44 @@
+2011-02-11  Anders Carlsson  <andersca@apple.com>
+
+        Reviewed by Sam Weinig.
+
+        Incoming synchronous messages should always be processed regardless of connection waiting for a reply
+        https://bugs.webkit.org/show_bug.cgi?id=54326
+
+        * Platform/CoreIPC/Connection.cpp:
+        (CoreIPC::Connection::SyncMessageState::getOrCreate):
+        Assert that the map has a non-null SyncMessageState object.
+
+        (CoreIPC::Connection::SyncMessageState::SyncMessageState):
+        Initialize m_waitForSyncReplyCount.
+
+        (CoreIPC::Connection::SyncMessageState::beginWaitForSyncReply):
+        Increment m_waitForSyncReplyCount.
+
+        (CoreIPC::Connection::SyncMessageState::endWaitForSyncReply):
+        Decrement m_waitForSyncReplyCount. If it's 0, enqueue any incoming sync messages.
+
+        (CoreIPC::Connection::SyncMessageState::processIncomingMessage):
+        If this is a message that needs to be dispatched, add it to the queue and wake up the client run loop.
+
+        (CoreIPC::Connection::SyncMessageState::dispatchMessages):
+        Go through the queue of incoming messages and dispatch them.
+
+        (CoreIPC::Connection::sendSyncMessage):
+        call beginWaitForSyncReply/endWaitForSyncReply.
+
+        (CoreIPC::Connection::waitForSyncReply):
+        Dispatch messages.
+
+        (CoreIPC::Connection::processIncomingMessage):
+        Call SyncMessageState::processIncomingMessage.
+
+        (CoreIPC::Connection::enqueueIncomingMessage):
+        Add helper function for enqueuing an incoming message.
+
+        * Platform/CoreIPC/Connection.h:
+        (CoreIPC::Connection::Message::destinationID):
+
 2011-02-11  Sam Weinig  <sam@webkit.org>
 
         Reviewed by Maciej Stachowiak.
index 543aeb8..0ac4e3d 100644 (file)
@@ -41,6 +41,9 @@ public:
     static PassRefPtr<SyncMessageState> getOrCreate(RunLoop*);
     ~SyncMessageState();
 
+    void beginWaitForSyncReply();
+    void endWaitForSyncReply();
+
     void wakeUpClientRunLoop()
     {
         m_waitForSyncReplySemaphore.signal();
@@ -51,6 +54,12 @@ public:
         return m_waitForSyncReplySemaphore.wait(absoluteTime);
     }
 
+    // Returns true if this message will be handled on a client thread that is currently
+    // waiting for a reply to a synchronous message.
+    bool processIncomingMessage(Connection*, IncomingMessage&);
+
+    void dispatchMessages();
+
 private:
     explicit SyncMessageState(RunLoop*);
 
@@ -69,6 +78,17 @@ private:
 
     RunLoop* m_runLoop;
     BinarySemaphore m_waitForSyncReplySemaphore;
+
+    // Protects m_waitForSyncReplyCount and m_messagesToDispatchWhileWaitingForSyncReply.
+    Mutex m_mutex;
+
+    unsigned m_waitForSyncReplyCount;
+
+    struct ConnectionAndIncomingMessage {
+        Connection* connection;
+        IncomingMessage incomingMessage;
+    };
+    Vector<ConnectionAndIncomingMessage> m_messagesToDispatchWhileWaitingForSyncReply;
 };
 
 PassRefPtr<Connection::SyncMessageState> Connection::SyncMessageState::getOrCreate(RunLoop* runLoop)
@@ -76,8 +96,10 @@ PassRefPtr<Connection::SyncMessageState> Connection::SyncMessageState::getOrCrea
     MutexLocker locker(syncMessageStateMapMutex());
     pair<SyncMessageStateMap::iterator, bool> result = syncMessageStateMap().add(runLoop, 0);
 
-    if (!result.second)
+    if (!result.second) {
+        ASSERT(result.first->second);
         return result.first->second;
+    }
 
     RefPtr<SyncMessageState> syncMessageState = adoptRef(new SyncMessageState(runLoop));
     result.first->second = syncMessageState.get();
@@ -87,6 +109,7 @@ PassRefPtr<Connection::SyncMessageState> Connection::SyncMessageState::getOrCrea
 
 Connection::SyncMessageState::SyncMessageState(RunLoop* runLoop)
     : m_runLoop(runLoop)
+    , m_waitForSyncReplyCount(0)
 {
 }
 
@@ -98,6 +121,71 @@ Connection::SyncMessageState::~SyncMessageState()
     syncMessageStateMap().remove(m_runLoop);
 }
 
+void Connection::SyncMessageState::beginWaitForSyncReply()
+{
+    ASSERT(RunLoop::current() == m_runLoop);
+
+    MutexLocker locker(m_mutex);
+    m_waitForSyncReplyCount++;
+}
+
+void Connection::SyncMessageState::endWaitForSyncReply()
+{
+    ASSERT(RunLoop::current() == m_runLoop);
+
+    MutexLocker locker(m_mutex);
+    ASSERT(m_waitForSyncReplyCount);
+    --m_waitForSyncReplyCount;
+
+    if (m_waitForSyncReplyCount)
+        return;
+
+    // Dispatch any remaining incoming sync messages.
+    for (size_t i = 0; i < m_messagesToDispatchWhileWaitingForSyncReply.size(); ++i) {
+        ConnectionAndIncomingMessage& connectionAndIncomingMessage = m_messagesToDispatchWhileWaitingForSyncReply[i];
+        connectionAndIncomingMessage.connection->enqueueIncomingMessage(connectionAndIncomingMessage.incomingMessage);
+    }
+
+    m_messagesToDispatchWhileWaitingForSyncReply.clear();
+}
+
+bool Connection::SyncMessageState::processIncomingMessage(Connection* connection, IncomingMessage& incomingMessage)
+{
+    MessageID messageID = incomingMessage.messageID();
+    if (!messageID.isSync() && !messageID.shouldDispatchMessageWhenWaitingForSyncReply())
+        return false;
+
+    MutexLocker locker(m_mutex);
+    if (!m_waitForSyncReplyCount)
+        return false;
+
+    ConnectionAndIncomingMessage connectionAndIncomingMessage;
+    connectionAndIncomingMessage.connection = connection;
+    connectionAndIncomingMessage.incomingMessage = incomingMessage;
+
+    m_messagesToDispatchWhileWaitingForSyncReply.append(connectionAndIncomingMessage);
+    wakeUpClientRunLoop();
+
+    return true;
+}
+
+void Connection::SyncMessageState::dispatchMessages()
+{
+    ASSERT(m_runLoop == RunLoop::current());
+
+    Vector<ConnectionAndIncomingMessage> messagesToDispatchWhileWaitingForSyncReply;
+
+    {
+        MutexLocker locker(m_mutex);
+        m_messagesToDispatchWhileWaitingForSyncReply.swap(messagesToDispatchWhileWaitingForSyncReply);
+    }
+
+    for (size_t i = 0; i < messagesToDispatchWhileWaitingForSyncReply.size(); ++i) {
+        ConnectionAndIncomingMessage& connectionAndIncomingMessage = messagesToDispatchWhileWaitingForSyncReply[i];
+        connectionAndIncomingMessage.connection->dispatchMessage(connectionAndIncomingMessage.incomingMessage);
+    }
+}
+
 PassRefPtr<Connection> Connection::createServerConnection(Identifier identifier, Client* client, RunLoop* clientRunLoop)
 {
     return adoptRef(new Connection(identifier, true, client, clientRunLoop));
@@ -273,7 +361,9 @@ PassOwnPtr<ArgumentDecoder> Connection::sendSyncMessage(MessageID messageID, uin
     
     // First send the message.
     sendMessage(messageID, encoder);
-    
+
+    m_syncMessageState->beginWaitForSyncReply();
+
     // Then wait for a reply. Waiting for a reply could involve dispatching incoming sync messages, so
     // keep an extra reference to the connection here in case it's invalidated.
     RefPtr<Connection> protect(this);
@@ -284,22 +374,10 @@ PassOwnPtr<ArgumentDecoder> Connection::sendSyncMessage(MessageID messageID, uin
         MutexLocker locker(m_syncReplyStateMutex);
         ASSERT(m_pendingSyncReplies.last().syncRequestID == syncRequestID);
         m_pendingSyncReplies.removeLast();
-
-        if (m_pendingSyncReplies.isEmpty()) {
-            // This was the bottom-most sendSyncMessage call in the stack. If we have any pending incoming
-            // messages, they need to be dispatched.
-            if (!m_syncMessagesReceivedWhileWaitingForSyncReply.isEmpty()) {
-                // Add the messages.
-                MutexLocker locker(m_incomingMessagesLock);
-                m_incomingMessages.append(m_syncMessagesReceivedWhileWaitingForSyncReply);
-                m_syncMessagesReceivedWhileWaitingForSyncReply.clear();
-
-                // Schedule for the messages to be sent.
-                m_clientRunLoop->scheduleWork(WorkItem::create(this, &Connection::dispatchMessages));
-            }
-        }
     }
 
+    m_syncMessageState->endWaitForSyncReply();
+
     if (!reply)
         m_client->didFailToSendSyncMessage(this);
 
@@ -312,30 +390,12 @@ PassOwnPtr<ArgumentDecoder> Connection::waitForSyncReply(uint64_t syncRequestID,
 
     bool timedOut = false;
     while (!timedOut) {
+        // First, check if we have any messages that we need to process.
+        m_syncMessageState->dispatchMessages();
+        
         {
             MutexLocker locker(m_syncReplyStateMutex);
 
-            // First, check if we have any sync messages that we need to process.
-            Vector<IncomingMessage> syncMessagesReceivedWhileWaitingForSyncReply;
-            m_syncMessagesReceivedWhileWaitingForSyncReply.swap(syncMessagesReceivedWhileWaitingForSyncReply);
-
-            if (!syncMessagesReceivedWhileWaitingForSyncReply.isEmpty()) {
-                // Make sure to unlock the mutex here because we're calling out to client code which could in turn send
-                // another sync message and we don't want that to deadlock.
-                m_syncReplyStateMutex.unlock();
-                
-                for (size_t i = 0; i < syncMessagesReceivedWhileWaitingForSyncReply.size(); ++i) {
-                    IncomingMessage& message = syncMessagesReceivedWhileWaitingForSyncReply[i];
-                    OwnPtr<ArgumentDecoder> arguments = message.releaseArguments();
-
-                    if (message.messageID().isSync())
-                        dispatchSyncMessage(message.messageID(), arguments.get());
-                    else
-                        m_client->didReceiveMessage(this, message.messageID(), arguments.get());
-                }
-                m_syncReplyStateMutex.lock();
-            }
-
             // Second, check if there is a sync reply at the top of the stack.
             ASSERT(!m_pendingSyncReplies.isEmpty());
             
@@ -371,25 +431,19 @@ void Connection::processIncomingMessage(MessageID messageID, PassOwnPtr<Argument
         return;
     }
 
+    IncomingMessage incomingMessage(messageID, arguments);
+
     // Check if this is a sync message or if it's a message that should be dispatched even when waiting for
     // a sync reply. If it is, and we're waiting for a sync reply this message needs to be dispatched.
     // If we don't we'll end up with a deadlock where both sync message senders are stuck waiting for a reply.
-    if (messageID.isSync() || messageID.shouldDispatchMessageWhenWaitingForSyncReply()) {
-        MutexLocker locker(m_syncReplyStateMutex);
-        if (!m_pendingSyncReplies.isEmpty()) {
-            m_syncMessagesReceivedWhileWaitingForSyncReply.append(IncomingMessage(messageID, arguments));
+    if (m_syncMessageState->processIncomingMessage(this, incomingMessage))
+        return;
 
-            // The message has been added, now wake up the client thread.
-            m_syncMessageState->wakeUpClientRunLoop();
-            return;
-        }
-    }
-        
     // Check if we're waiting for this message.
     {
         MutexLocker locker(m_waitForMessageMutex);
         
-        HashMap<std::pair<unsigned, uint64_t>, ArgumentDecoder*>::iterator it = m_waitForMessageMap.find(std::make_pair(messageID.toInt(), arguments->destinationID()));
+        HashMap<std::pair<unsigned, uint64_t>, ArgumentDecoder*>::iterator it = m_waitForMessageMap.find(std::make_pair(messageID.toInt(), incomingMessage.destinationID()));
         if (it != m_waitForMessageMap.end()) {
             it->second = arguments.leakPtr();
         
@@ -398,10 +452,7 @@ void Connection::processIncomingMessage(MessageID messageID, PassOwnPtr<Argument
         }
     }
 
-    MutexLocker locker(m_incomingMessagesLock);
-    m_incomingMessages.append(IncomingMessage(messageID, arguments));
-
-    m_clientRunLoop->scheduleWork(WorkItem::create(this, &Connection::dispatchMessages));
+    enqueueIncomingMessage(incomingMessage);
 }
 
 void Connection::connectionDidClose()
@@ -498,6 +549,14 @@ void Connection::dispatchSyncMessage(MessageID messageID, ArgumentDecoder* argum
     sendSyncReply(replyEncoder);
 }
 
+void Connection::enqueueIncomingMessage(IncomingMessage& incomingMessage)
+{
+    MutexLocker locker(m_incomingMessagesLock);
+    m_incomingMessages.append(incomingMessage);
+
+    m_clientRunLoop->scheduleWork(WorkItem::create(this, &Connection::dispatchMessages));
+}
+
 void Connection::dispatchMessage(IncomingMessage& message)
 {
     OwnPtr<ArgumentDecoder> arguments = message.releaseArguments();
index 935ae9c..a895011 100644 (file)
@@ -157,6 +157,8 @@ private:
         }
         
         MessageID messageID() const { return m_messageID; }
+        uint64_t destinationID() const { return m_arguments->destinationID(); }
+
         T* arguments() const { return m_arguments; }
         
         PassOwnPtr<T> releaseArguments()
@@ -202,7 +204,10 @@ private:
     void dispatchMessage(IncomingMessage&);
     void dispatchMessages();
     void dispatchSyncMessage(MessageID, ArgumentDecoder*);
-                             
+
+    // Can be called on any thread.
+    void enqueueIncomingMessage(IncomingMessage&);
+
     Client* m_client;
     bool m_isServer;
     uint64_t m_syncRequestID;
@@ -270,7 +275,6 @@ private:
     Mutex m_syncReplyStateMutex;
     bool m_shouldWaitForSyncReplies;
     Vector<PendingSyncReply> m_pendingSyncReplies;
-    Vector<IncomingMessage> m_syncMessagesReceivedWhileWaitingForSyncReply;
 
 #if PLATFORM(MAC)
     // Called on the connection queue.