+2014-06-11 Timothy Horton <timothy_horton@apple.com>
+
+ Make it possible for waitForAndDispatchImmediately to bail if a sync message comes in from the other direction
+ https://bugs.webkit.org/show_bug.cgi?id=133708
+
+ Reviewed by Anders Carlsson.
+
+ * Platform/IPC/Connection.cpp:
+ (IPC::WaitForMessageState):
+ (IPC::Connection::Connection):
+ (IPC::Connection::waitForMessage):
+ (IPC::Connection::processIncomingMessage):
+ (IPC::Connection::connectionDidClose):
+ * Platform/IPC/Connection.h:
+ (IPC::Connection::waitForAndDispatchImmediately):
+ Remove the waitForMessageMap, and assert that we're only ever waiting for one message at a time.
+ This simplifies this code a bit, and we never wait on more than one message at a time, so it was unnecessary.
+
+ Add a flag to waitForAndDispatchImmediately, InterruptWaitingIfSyncMessageArrives, which will cause
+ waitForAndDispatchImmediately to bail if a sync message arrives, to avoid pointlessly blocking both processes
+ for the entire timeout.
+
2014-06-11 Oliver Hunt <oliver@apple.com>
Restrict database process profile
namespace IPC {
+struct WaitForMessageState {
+ WaitForMessageState(StringReference messageReceiverName, StringReference messageName, uint64_t destinationID, unsigned waitForMessageFlags)
+ : messageReceiverName(messageReceiverName)
+ , messageName(messageName)
+ , destinationID(destinationID)
+ , waitForMessageFlags(waitForMessageFlags)
+ {
+ }
+
+ StringReference messageReceiverName;
+ StringReference messageName;
+ uint64_t destinationID;
+
+ unsigned waitForMessageFlags;
+ bool messageWaitingInterrupted = false;
+
+ std::unique_ptr<MessageDecoder> decoder;
+};
+
class Connection::SyncMessageState : public ThreadSafeRefCounted<Connection::SyncMessageState> {
public:
static PassRefPtr<SyncMessageState> getOrCreate(RunLoop&);
, m_inDispatchMessageCount(0)
, m_inDispatchMessageMarkedDispatchWhenWaitingForSyncReplyCount(0)
, m_didReceiveInvalidMessage(false)
- , m_messageWaitingInterruptedByClosedConnection(false)
+ , m_waitingForMessage(nullptr)
, m_syncMessageState(SyncMessageState::getOrCreate(clientRunLoop))
, m_shouldWaitForSyncReplies(true)
{
return sendMessage(std::move(encoder));
}
-std::unique_ptr<MessageDecoder> Connection::waitForMessage(StringReference messageReceiverName, StringReference messageName, uint64_t destinationID, std::chrono::milliseconds timeout)
+std::unique_ptr<MessageDecoder> Connection::waitForMessage(StringReference messageReceiverName, StringReference messageName, uint64_t destinationID, std::chrono::milliseconds timeout, unsigned waitForMessageFlags)
{
+ ASSERT(&m_clientRunLoop == &RunLoop::current());
+
// First, check if this message is already in the incoming messages queue.
{
MutexLocker locker(m_incomingMessagesLock);
}
}
- std::pair<std::pair<StringReference, StringReference>, uint64_t> messageAndDestination(std::make_pair(std::make_pair(messageReceiverName, messageName), destinationID));
-
+ WaitForMessageState waitingForMessage(messageReceiverName, messageName, destinationID, waitForMessageFlags);
+
{
std::lock_guard<std::mutex> lock(m_waitForMessageMutex);
- // We don't support having multiple clients wait for the same message.
- ASSERT(!m_waitForMessageMap.contains(messageAndDestination));
-
- // Insert our pending wait.
- m_waitForMessageMap.set(messageAndDestination, nullptr);
+ // We don't support having multiple clients waiting for messages.
+ ASSERT(!m_waitingForMessage);
+
+ m_waitingForMessage = &waitingForMessage;
}
// Now wait for it to be set.
while (true) {
std::unique_lock<std::mutex> lock(m_waitForMessageMutex);
- auto it = m_waitForMessageMap.find(messageAndDestination);
- if (it->value) {
- std::unique_ptr<MessageDecoder> decoder = std::move(it->value);
- m_waitForMessageMap.remove(it);
-
+ if (m_waitingForMessage->decoder) {
+ auto decoder = std::move(m_waitingForMessage->decoder);
+ m_waitingForMessage = nullptr;
return decoder;
}
// Now we wait.
std::cv_status status = m_waitForMessageCondition.wait_for(lock, timeout);
- if (status == std::cv_status::timeout || m_messageWaitingInterruptedByClosedConnection) {
- // We timed out or lost our connection, now remove the pending wait.
- m_waitForMessageMap.remove(messageAndDestination);
-
- // If there are no more waiters, reset m_messageWaitingInterruptedByClosedConnection while we already hold
- // the necessary lock in case the connection is later reopened.
- if (m_waitForMessageMap.isEmpty())
- m_messageWaitingInterruptedByClosedConnection = false;
-
+ // We timed out, lost our connection, or a sync message came in with InterruptWaitingIfSyncMessageArrives, so stop waiting.
+ if (status == std::cv_status::timeout || m_waitingForMessage->messageWaitingInterrupted)
break;
- }
}
+ m_waitingForMessage = nullptr;
+
return nullptr;
}
{
std::lock_guard<std::mutex> lock(m_waitForMessageMutex);
- auto it = m_waitForMessageMap.find(std::make_pair(std::make_pair(message->messageReceiverName(), message->messageName()), message->destinationID()));
- if (it != m_waitForMessageMap.end()) {
- it->value = std::move(message);
- ASSERT(it->value);
-
+ if (m_waitingForMessage && m_waitingForMessage->messageReceiverName == message->messageReceiverName() && m_waitingForMessage->messageName == message->messageName() && m_waitingForMessage->destinationID == message->destinationID()) {
+ m_waitingForMessage->decoder = std::move(message);
+ ASSERT(m_waitingForMessage->decoder);
m_waitForMessageCondition.notify_one();
return;
}
+
+ if (m_waitingForMessage && (m_waitingForMessage->waitForMessageFlags & InterruptWaitingIfSyncMessageArrives) && message->isSyncMessage()) {
+ m_waitingForMessage->messageWaitingInterrupted = true;
+ m_waitForMessageCondition.notify_one();
+ }
}
enqueueIncomingMessage(std::move(message));
{
std::lock_guard<std::mutex> lock(m_waitForMessageMutex);
- m_messageWaitingInterruptedByClosedConnection = true;
+ if (m_waitingForMessage)
+ m_waitingForMessage->messageWaitingInterrupted = true;
}
m_waitForMessageCondition.notify_all();
namespace IPC {
+struct WaitForMessageState;
+
enum MessageSendFlags {
// Whether this message should be dispatched when waiting for a sync reply.
// This is the default for synchronous messages.
// FIXME (126021): Remove when no platforms need to support this.
SpinRunLoopWhileWaitingForReply = 1 << 1,
};
+
+enum WaitForMessageFlags {
+ // Use this to make waitForMessage be interrupted immediately by any incoming sync messages.
+ InterruptWaitingIfSyncMessageArrives = 1 << 0,
+};
#define MESSAGE_CHECK_BASE(assertion, connection) do \
if (!(assertion)) { \
template<typename T> bool send(T&& message, uint64_t destinationID, unsigned messageSendFlags = 0);
template<typename T> bool sendSync(T&& message, typename T::Reply&& reply, uint64_t destinationID, std::chrono::milliseconds timeout = std::chrono::milliseconds::max(), unsigned syncSendFlags = 0);
- template<typename T> bool waitForAndDispatchImmediately(uint64_t destinationID, std::chrono::milliseconds timeout);
+ template<typename T> bool waitForAndDispatchImmediately(uint64_t destinationID, std::chrono::milliseconds timeout, unsigned waitForMessageFlags = 0);
std::unique_ptr<MessageEncoder> createSyncMessageEncoder(StringReference messageReceiverName, StringReference messageName, uint64_t destinationID, uint64_t& syncRequestID);
bool sendMessage(std::unique_ptr<MessageEncoder>, unsigned messageSendFlags = 0);
bool isValid() const { return m_client; }
- std::unique_ptr<MessageDecoder> waitForMessage(StringReference messageReceiverName, StringReference messageName, uint64_t destinationID, std::chrono::milliseconds timeout);
+ std::unique_ptr<MessageDecoder> waitForMessage(StringReference messageReceiverName, StringReference messageName, uint64_t destinationID, std::chrono::milliseconds timeout, unsigned waitForMessageFlags);
std::unique_ptr<MessageDecoder> waitForSyncReply(uint64_t syncRequestID, std::chrono::milliseconds timeout, unsigned syncSendFlags);
std::condition_variable m_waitForMessageCondition;
std::mutex m_waitForMessageMutex;
- bool m_messageWaitingInterruptedByClosedConnection;
- HashMap<std::pair<std::pair<StringReference, StringReference>, uint64_t>, std::unique_ptr<MessageDecoder>> m_waitForMessageMap;
+
+ WaitForMessageState* m_waitingForMessage;
// Represents a sync request for which we're waiting on a reply.
struct PendingSyncReply {
return replyDecoder->decode(reply);
}
-template<typename T> bool Connection::waitForAndDispatchImmediately(uint64_t destinationID, std::chrono::milliseconds timeout)
+template<typename T> bool Connection::waitForAndDispatchImmediately(uint64_t destinationID, std::chrono::milliseconds timeout, unsigned waitForMessageFlags)
{
- std::unique_ptr<MessageDecoder> decoder = waitForMessage(T::receiverName(), T::name(), destinationID, timeout);
+ std::unique_ptr<MessageDecoder> decoder = waitForMessage(T::receiverName(), T::name(), destinationID, timeout, waitForMessageFlags);
if (!decoder)
return false;