Make it possible for waitForAndDispatchImmediately to bail if a sync message comes...
authortimothy_horton@apple.com <timothy_horton@apple.com@268f45cc-cd09-0410-ab3c-d52691b4dbfc>
Wed, 11 Jun 2014 20:37:59 +0000 (20:37 +0000)
committertimothy_horton@apple.com <timothy_horton@apple.com@268f45cc-cd09-0410-ab3c-d52691b4dbfc>
Wed, 11 Jun 2014 20:37:59 +0000 (20:37 +0000)
https://bugs.webkit.org/show_bug.cgi?id=133708

Reviewed by Anders Carlsson.

* Platform/IPC/Connection.cpp:
(IPC::WaitForMessageState):
(IPC::Connection::Connection):
(IPC::Connection::waitForMessage):
(IPC::Connection::processIncomingMessage):
(IPC::Connection::connectionDidClose):
* Platform/IPC/Connection.h:
(IPC::Connection::waitForAndDispatchImmediately):
Remove the waitForMessageMap, and assert that we're only ever waiting for one message at a time.
This simplifies this code a bit, and we never wait on more than one message at a time, so it was unnecessary.

Add a flag to waitForAndDispatchImmediately, InterruptWaitingIfSyncMessageArrives, which will cause
waitForAndDispatchImmediately to bail if a sync message arrives, to avoid pointlessly blocking both processes
for the entire timeout.

git-svn-id: https://svn.webkit.org/repository/webkit/trunk@169825 268f45cc-cd09-0410-ab3c-d52691b4dbfc

Source/WebKit2/ChangeLog
Source/WebKit2/Platform/IPC/Connection.cpp
Source/WebKit2/Platform/IPC/Connection.h

index 441206d..2483f9a 100644 (file)
@@ -1,3 +1,25 @@
+2014-06-11  Timothy Horton  <timothy_horton@apple.com>
+
+        Make it possible for waitForAndDispatchImmediately to bail if a sync message comes in from the other direction
+        https://bugs.webkit.org/show_bug.cgi?id=133708
+
+        Reviewed by Anders Carlsson.
+
+        * Platform/IPC/Connection.cpp:
+        (IPC::WaitForMessageState):
+        (IPC::Connection::Connection):
+        (IPC::Connection::waitForMessage):
+        (IPC::Connection::processIncomingMessage):
+        (IPC::Connection::connectionDidClose):
+        * Platform/IPC/Connection.h:
+        (IPC::Connection::waitForAndDispatchImmediately):
+        Remove the waitForMessageMap, and assert that we're only ever waiting for one message at a time.
+        This simplifies this code a bit, and we never wait on more than one message at a time, so it was unnecessary.
+
+        Add a flag to waitForAndDispatchImmediately, InterruptWaitingIfSyncMessageArrives, which will cause
+        waitForAndDispatchImmediately to bail if a sync message arrives, to avoid pointlessly blocking both processes
+        for the entire timeout.
+
 2014-06-11  Oliver Hunt  <oliver@apple.com>
 
         Restrict database process profile
index f240d07..6eb900c 100644 (file)
 
 namespace IPC {
 
+struct WaitForMessageState {
+    WaitForMessageState(StringReference messageReceiverName, StringReference messageName, uint64_t destinationID, unsigned waitForMessageFlags)
+        : messageReceiverName(messageReceiverName)
+        , messageName(messageName)
+        , destinationID(destinationID)
+        , waitForMessageFlags(waitForMessageFlags)
+    {
+    }
+
+    StringReference messageReceiverName;
+    StringReference messageName;
+    uint64_t destinationID;
+
+    unsigned waitForMessageFlags;
+    bool messageWaitingInterrupted = false;
+
+    std::unique_ptr<MessageDecoder> decoder;
+};
+
 class Connection::SyncMessageState : public ThreadSafeRefCounted<Connection::SyncMessageState> {
 public:
     static PassRefPtr<SyncMessageState> getOrCreate(RunLoop&);
@@ -225,7 +244,7 @@ Connection::Connection(Identifier identifier, bool isServer, Client* client, Run
     , m_inDispatchMessageCount(0)
     , m_inDispatchMessageMarkedDispatchWhenWaitingForSyncReplyCount(0)
     , m_didReceiveInvalidMessage(false)
-    , m_messageWaitingInterruptedByClosedConnection(false)
+    , m_waitingForMessage(nullptr)
     , m_syncMessageState(SyncMessageState::getOrCreate(clientRunLoop))
     , m_shouldWaitForSyncReplies(true)
 {
@@ -371,8 +390,10 @@ bool Connection::sendSyncReply(std::unique_ptr<MessageEncoder> encoder)
     return sendMessage(std::move(encoder));
 }
 
-std::unique_ptr<MessageDecoder> Connection::waitForMessage(StringReference messageReceiverName, StringReference messageName, uint64_t destinationID, std::chrono::milliseconds timeout)
+std::unique_ptr<MessageDecoder> Connection::waitForMessage(StringReference messageReceiverName, StringReference messageName, uint64_t destinationID, std::chrono::milliseconds timeout, unsigned waitForMessageFlags)
 {
+    ASSERT(&m_clientRunLoop == &RunLoop::current());
+
     // First, check if this message is already in the incoming messages queue.
     {
         MutexLocker locker(m_incomingMessagesLock);
@@ -389,45 +410,36 @@ std::unique_ptr<MessageDecoder> Connection::waitForMessage(StringReference messa
         }
     }
 
-    std::pair<std::pair<StringReference, StringReference>, uint64_t> messageAndDestination(std::make_pair(std::make_pair(messageReceiverName, messageName), destinationID));
-    
+    WaitForMessageState waitingForMessage(messageReceiverName, messageName, destinationID, waitForMessageFlags);
+
     {
         std::lock_guard<std::mutex> lock(m_waitForMessageMutex);
 
-        // We don't support having multiple clients wait for the same message.
-        ASSERT(!m_waitForMessageMap.contains(messageAndDestination));
-    
-        // Insert our pending wait.
-        m_waitForMessageMap.set(messageAndDestination, nullptr);
+        // We don't support having multiple clients waiting for messages.
+        ASSERT(!m_waitingForMessage);
+
+        m_waitingForMessage = &waitingForMessage;
     }
     
     // Now wait for it to be set.
     while (true) {
         std::unique_lock<std::mutex> lock(m_waitForMessageMutex);
 
-        auto it = m_waitForMessageMap.find(messageAndDestination);
-        if (it->value) {
-            std::unique_ptr<MessageDecoder> decoder = std::move(it->value);
-            m_waitForMessageMap.remove(it);
-
+        if (m_waitingForMessage->decoder) {
+            auto decoder = std::move(m_waitingForMessage->decoder);
+            m_waitingForMessage = nullptr;
             return decoder;
         }
 
         // Now we wait.
         std::cv_status status = m_waitForMessageCondition.wait_for(lock, timeout);
-        if (status == std::cv_status::timeout || m_messageWaitingInterruptedByClosedConnection) {
-            // We timed out or lost our connection, now remove the pending wait.
-            m_waitForMessageMap.remove(messageAndDestination);
-
-            // If there are no more waiters, reset m_messageWaitingInterruptedByClosedConnection while we already hold
-            // the necessary lock in case the connection is later reopened.
-            if (m_waitForMessageMap.isEmpty())
-                m_messageWaitingInterruptedByClosedConnection = false;
-
+        // We timed out, lost our connection, or a sync message came in with InterruptWaitingIfSyncMessageArrives, so stop waiting.
+        if (status == std::cv_status::timeout || m_waitingForMessage->messageWaitingInterrupted)
             break;
-        }
     }
 
+    m_waitingForMessage = nullptr;
+
     return nullptr;
 }
 
@@ -647,14 +659,17 @@ void Connection::processIncomingMessage(std::unique_ptr<MessageDecoder> message)
     {
         std::lock_guard<std::mutex> lock(m_waitForMessageMutex);
 
-        auto it = m_waitForMessageMap.find(std::make_pair(std::make_pair(message->messageReceiverName(), message->messageName()), message->destinationID()));
-        if (it != m_waitForMessageMap.end()) {
-            it->value = std::move(message);
-            ASSERT(it->value);
-        
+        if (m_waitingForMessage && m_waitingForMessage->messageReceiverName == message->messageReceiverName() && m_waitingForMessage->messageName == message->messageName() && m_waitingForMessage->destinationID == message->destinationID()) {
+            m_waitingForMessage->decoder = std::move(message);
+            ASSERT(m_waitingForMessage->decoder);
             m_waitForMessageCondition.notify_one();
             return;
         }
+
+        if (m_waitingForMessage && (m_waitingForMessage->waitForMessageFlags & InterruptWaitingIfSyncMessageArrives) && message->isSyncMessage()) {
+            m_waitingForMessage->messageWaitingInterrupted = true;
+            m_waitForMessageCondition.notify_one();
+        }
     }
 
     enqueueIncomingMessage(std::move(message));
@@ -685,7 +700,8 @@ void Connection::connectionDidClose()
 
     {
         std::lock_guard<std::mutex> lock(m_waitForMessageMutex);
-        m_messageWaitingInterruptedByClosedConnection = true;
+        if (m_waitingForMessage)
+            m_waitingForMessage->messageWaitingInterrupted = true;
     }
     m_waitForMessageCondition.notify_all();
 
index a516c1f..a9acd77 100644 (file)
@@ -55,6 +55,8 @@ class RunLoop;
 
 namespace IPC {
 
+struct WaitForMessageState;
+
 enum MessageSendFlags {
     // Whether this message should be dispatched when waiting for a sync reply.
     // This is the default for synchronous messages.
@@ -68,6 +70,11 @@ enum SyncMessageSendFlags {
     // FIXME (126021): Remove when no platforms need to support this.
     SpinRunLoopWhileWaitingForReply = 1 << 1,
 };
+
+enum WaitForMessageFlags {
+    // Use this to make waitForMessage be interrupted immediately by any incoming sync messages.
+    InterruptWaitingIfSyncMessageArrives = 1 << 0,
+};
     
 #define MESSAGE_CHECK_BASE(assertion, connection) do \
     if (!(assertion)) { \
@@ -164,7 +171,7 @@ public:
 
     template<typename T> bool send(T&& message, uint64_t destinationID, unsigned messageSendFlags = 0);
     template<typename T> bool sendSync(T&& message, typename T::Reply&& reply, uint64_t destinationID, std::chrono::milliseconds timeout = std::chrono::milliseconds::max(), unsigned syncSendFlags = 0);
-    template<typename T> bool waitForAndDispatchImmediately(uint64_t destinationID, std::chrono::milliseconds timeout);
+    template<typename T> bool waitForAndDispatchImmediately(uint64_t destinationID, std::chrono::milliseconds timeout, unsigned waitForMessageFlags = 0);
 
     std::unique_ptr<MessageEncoder> createSyncMessageEncoder(StringReference messageReceiverName, StringReference messageName, uint64_t destinationID, uint64_t& syncRequestID);
     bool sendMessage(std::unique_ptr<MessageEncoder>, unsigned messageSendFlags = 0);
@@ -193,7 +200,7 @@ private:
     
     bool isValid() const { return m_client; }
     
-    std::unique_ptr<MessageDecoder> waitForMessage(StringReference messageReceiverName, StringReference messageName, uint64_t destinationID, std::chrono::milliseconds timeout);
+    std::unique_ptr<MessageDecoder> waitForMessage(StringReference messageReceiverName, StringReference messageName, uint64_t destinationID, std::chrono::milliseconds timeout, unsigned waitForMessageFlags);
     
     std::unique_ptr<MessageDecoder> waitForSyncReply(uint64_t syncRequestID, std::chrono::milliseconds timeout, unsigned syncSendFlags);
 
@@ -253,8 +260,8 @@ private:
     
     std::condition_variable m_waitForMessageCondition;
     std::mutex m_waitForMessageMutex;
-    bool m_messageWaitingInterruptedByClosedConnection;
-    HashMap<std::pair<std::pair<StringReference, StringReference>, uint64_t>, std::unique_ptr<MessageDecoder>> m_waitForMessageMap;
+
+    WaitForMessageState* m_waitingForMessage;
 
     // Represents a sync request for which we're waiting on a reply.
     struct PendingSyncReply {
@@ -357,9 +364,9 @@ template<typename T> bool Connection::sendSync(T&& message, typename T::Reply&&
     return replyDecoder->decode(reply);
 }
 
-template<typename T> bool Connection::waitForAndDispatchImmediately(uint64_t destinationID, std::chrono::milliseconds timeout)
+template<typename T> bool Connection::waitForAndDispatchImmediately(uint64_t destinationID, std::chrono::milliseconds timeout, unsigned waitForMessageFlags)
 {
-    std::unique_ptr<MessageDecoder> decoder = waitForMessage(T::receiverName(), T::name(), destinationID, timeout);
+    std::unique_ptr<MessageDecoder> decoder = waitForMessage(T::receiverName(), T::name(), destinationID, timeout, waitForMessageFlags);
     if (!decoder)
         return false;