2 * Copyright (C) 2010 Apple Inc. All rights reserved.
3 * Copyright (C) 2010 Nokia Corporation and/or its subsidiary(-ies)
4 * Portions Copyright (c) 2010 Motorola Mobility, Inc. All rights reserved.
6 * Redistribution and use in source and binary forms, with or without
7 * modification, are permitted provided that the following conditions
9 * 1. Redistributions of source code must retain the above copyright
10 * notice, this list of conditions and the following disclaimer.
11 * 2. Redistributions in binary form must reproduce the above copyright
12 * notice, this list of conditions and the following disclaimer in the
13 * documentation and/or other materials provided with the distribution.
15 * THIS SOFTWARE IS PROVIDED BY APPLE INC. AND ITS CONTRIBUTORS ``AS IS''
16 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
17 * THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
18 * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL APPLE INC. OR ITS CONTRIBUTORS
19 * BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
20 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
21 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
22 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
23 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
24 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF
25 * THE POSSIBILITY OF SUCH DAMAGE.
31 #include "ArgumentDecoder.h"
32 #include "ArgumentEncoder.h"
33 #include "Arguments.h"
34 #include "MessageID.h"
35 #include "WorkQueue.h"
36 #include <wtf/HashMap.h>
37 #include <wtf/PassRefPtr.h>
38 #include <wtf/OwnPtr.h>
39 #include <wtf/Threading.h>
42 #include <mach/mach_port.h>
46 class QSocketNotifier;
49 #if PLATFORM(QT) || PLATFORM(GTK)
50 #include "PlatformProcessIdentifier.h"
59 enum MessageSendFlags {
60 // Whether this message should be dispatched when waiting for a sync reply.
61 // This is the default for synchronous messages.
62 DispatchMessageEvenWhenWaitingForSyncReply = 1 << 0,
65 #define MESSAGE_CHECK_BASE(assertion, connection) do \
68 (connection)->markCurrentlyDispatchedMessageAsInvalid(); \
73 class Connection : public ThreadSafeRefCounted<Connection> {
75 class MessageReceiver {
77 virtual void didReceiveMessage(Connection*, MessageID, ArgumentDecoder*) = 0;
78 virtual void didReceiveSyncMessage(Connection*, MessageID, ArgumentDecoder*, OwnPtr<ArgumentEncoder>&) { ASSERT_NOT_REACHED(); }
81 virtual ~MessageReceiver() { }
84 class Client : public MessageReceiver {
86 virtual void didClose(Connection*) = 0;
87 virtual void didReceiveInvalidMessage(Connection*, MessageID) = 0;
88 virtual void syncMessageSendTimedOut(Connection*) = 0;
91 virtual Vector<HWND> windowsToReceiveSentMessagesWhileWaitingForSyncReply() = 0;
100 virtual bool willProcessMessageOnClientRunLoop(Connection*, MessageID, ArgumentDecoder*) = 0;
103 virtual ~QueueClient() { }
107 typedef mach_port_t Identifier;
109 typedef HANDLE Identifier;
110 static bool createServerAndClientIdentifiers(Identifier& serverIdentifier, Identifier& clientIdentifier);
111 #elif USE(UNIX_DOMAIN_SOCKETS) || OS(SYMBIAN)
112 typedef int Identifier;
115 static PassRefPtr<Connection> createServerConnection(Identifier, Client*, RunLoop* clientRunLoop);
116 static PassRefPtr<Connection> createClientConnection(Identifier, Client*, RunLoop* clientRunLoop);
120 void setShouldCloseConnectionOnMachExceptions();
121 #elif PLATFORM(QT) || PLATFORM(GTK)
122 void setShouldCloseConnectionOnProcessTermination(WebKit::PlatformProcessIdentifier);
125 void setOnlySendMessagesAsDispatchWhenWaitingForSyncReplyWhenProcessingSuchAMessage(bool);
126 void setShouldExitOnSyncMessageSendFailure(bool shouldExitOnSyncMessageSendFailure);
128 // The set callback will be called on the connection work queue when the connection is closed,
129 // before didCall is called on the client thread. Must be called before the connection is opened.
130 // In the future we might want a more generic way to handle sync or async messages directly
131 // on the work queue, for example if we want to handle them on some other thread we could avoid
132 // handling the message on the client thread first.
133 typedef void (*DidCloseOnConnectionWorkQueueCallback)(WorkQueue&, Connection*);
134 void setDidCloseOnConnectionWorkQueueCallback(DidCloseOnConnectionWorkQueueCallback callback);
136 void addQueueClient(QueueClient*);
137 void removeQueueClient(QueueClient*);
141 void markCurrentlyDispatchedMessageAsInvalid();
143 void setDefaultSyncMessageTimeout(double);
145 void postConnectionDidCloseOnConnectionWorkQueue();
147 static const int DefaultTimeout = 0;
148 static const int NoTimeout = -1;
150 template<typename T> bool send(const T& message, uint64_t destinationID, unsigned messageSendFlags = 0);
151 template<typename T> bool sendSync(const T& message, const typename T::Reply& reply, uint64_t destinationID, double timeout = DefaultTimeout);
152 template<typename T> bool waitForAndDispatchImmediately(uint64_t destinationID, double timeout);
154 PassOwnPtr<ArgumentEncoder> createSyncMessageArgumentEncoder(uint64_t destinationID, uint64_t& syncRequestID);
155 bool sendMessage(MessageID, PassOwnPtr<ArgumentEncoder>, unsigned messageSendFlags = 0);
156 bool sendSyncReply(PassOwnPtr<ArgumentEncoder>);
158 // FIXME: These variants of send, sendSync and waitFor are all deprecated.
159 // All clients should move to the overloads that take a message type.
160 template<typename E, typename T> bool deprecatedSend(E messageID, uint64_t destinationID, const T& arguments);
161 template<typename E, typename T, typename U> bool deprecatedSendSync(E messageID, uint64_t destinationID, const T& arguments, const U& reply, double timeout = NoTimeout);
164 template<typename T> class Message {
171 Message(MessageID messageID, PassOwnPtr<T> arguments)
172 : m_messageID(messageID)
173 , m_arguments(arguments.leakPtr())
177 MessageID messageID() const { return m_messageID; }
178 uint64_t destinationID() const { return m_arguments->destinationID(); }
180 T* arguments() const { return m_arguments; }
182 PassOwnPtr<T> releaseArguments()
184 OwnPtr<T> arguments = adoptPtr(m_arguments);
187 return arguments.release();
191 MessageID m_messageID;
192 // The memory management of this class is very unusual. The class acts
193 // as if it has an owning reference to m_arguments (e.g., accepting a
194 // PassOwnPtr in its constructor) in all ways except that it does not
195 // deallocate m_arguments on destruction.
196 // FIXME: Does this leak m_arguments on destruction?
201 typedef Message<ArgumentEncoder> OutgoingMessage;
204 Connection(Identifier, bool isServer, Client*, RunLoop* clientRunLoop);
205 void platformInitialize(Identifier);
206 void platformInvalidate();
208 bool isValid() const { return m_client; }
210 PassOwnPtr<ArgumentDecoder> waitForMessage(MessageID, uint64_t destinationID, double timeout);
212 PassOwnPtr<ArgumentDecoder> sendSyncMessage(MessageID, uint64_t syncRequestID, PassOwnPtr<ArgumentEncoder>, double timeout);
213 PassOwnPtr<ArgumentDecoder> waitForSyncReply(uint64_t syncRequestID, double timeout);
215 // Called on the connection work queue.
216 void processIncomingMessage(MessageID, PassOwnPtr<ArgumentDecoder>);
217 void processIncomingSyncReply(PassOwnPtr<ArgumentDecoder>);
219 bool canSendOutgoingMessages() const;
220 bool platformCanSendOutgoingMessages() const;
221 void sendOutgoingMessages();
222 bool sendOutgoingMessage(MessageID, PassOwnPtr<ArgumentEncoder>);
223 void connectionDidClose();
225 typedef Message<ArgumentDecoder> IncomingMessage;
227 // Called on the listener thread.
228 void dispatchConnectionDidClose();
229 void dispatchMessage(IncomingMessage&);
230 void dispatchMessages();
231 void dispatchSyncMessage(MessageID, ArgumentDecoder*);
232 void didFailToSendSyncMessage();
234 // Can be called on any thread.
235 void enqueueIncomingMessage(IncomingMessage&);
239 uint64_t m_syncRequestID;
241 bool m_onlySendMessagesAsDispatchWhenWaitingForSyncReplyWhenProcessingSuchAMessage;
242 bool m_shouldExitOnSyncMessageSendFailure;
243 DidCloseOnConnectionWorkQueueCallback m_didCloseOnConnectionWorkQueueCallback;
246 WorkQueue m_connectionQueue;
247 RunLoop* m_clientRunLoop;
249 Mutex m_connectionQueueClientsMutex;
250 Vector<QueueClient*> m_connectionQueueClients;
252 unsigned m_inDispatchMessageCount;
253 unsigned m_inDispatchMessageMarkedDispatchWhenWaitingForSyncReplyCount;
254 bool m_didReceiveInvalidMessage;
256 double m_defaultSyncMessageTimeout;
258 // Incoming messages.
259 Mutex m_incomingMessagesLock;
260 Vector<IncomingMessage> m_incomingMessages;
262 // Outgoing messages.
263 Mutex m_outgoingMessagesLock;
264 Deque<OutgoingMessage> m_outgoingMessages;
266 ThreadCondition m_waitForMessageCondition;
267 Mutex m_waitForMessageMutex;
268 HashMap<std::pair<unsigned, uint64_t>, ArgumentDecoder*> m_waitForMessageMap;
270 // Represents a sync request for which we're waiting on a reply.
271 struct PendingSyncReply {
273 uint64_t syncRequestID;
275 // The reply decoder, will be null if there was an error processing the sync
276 // message on the other side.
277 ArgumentDecoder* replyDecoder;
279 // Will be set to true once a reply has been received or an error occurred.
280 bool didReceiveReply;
285 , didReceiveReply(false)
289 explicit PendingSyncReply(uint64_t syncRequestID)
290 : syncRequestID(syncRequestID)
296 PassOwnPtr<ArgumentDecoder> releaseReplyDecoder()
298 OwnPtr<ArgumentDecoder> reply = adoptPtr(replyDecoder);
301 return reply.release();
305 class SyncMessageState;
306 friend class SyncMessageState;
307 RefPtr<SyncMessageState> m_syncMessageState;
309 Mutex m_syncReplyStateMutex;
310 bool m_shouldWaitForSyncReplies;
311 Vector<PendingSyncReply> m_pendingSyncReplies;
314 // Called on the connection queue.
315 void receiveSourceEventHandler();
316 void initializeDeadNameSource();
317 void exceptionSourceEventHandler();
319 mach_port_t m_sendPort;
320 mach_port_t m_receivePort;
322 // If setShouldCloseConnectionOnMachExceptions has been called, this has
323 // the exception port that exceptions from the other end will be sent on.
324 mach_port_t m_exceptionPort;
327 // Called on the connection queue.
328 void readEventHandler();
329 void writeEventHandler();
331 Vector<uint8_t> m_readBuffer;
332 OVERLAPPED m_readState;
333 OwnPtr<ArgumentEncoder> m_pendingWriteArguments;
334 OVERLAPPED m_writeState;
335 HANDLE m_connectionPipe;
336 #elif USE(UNIX_DOMAIN_SOCKETS) || OS(SYMBIAN)
337 // Called on the connection queue.
338 void readyReadHandler();
339 bool processMessage();
341 Vector<uint8_t> m_readBuffer;
342 size_t m_readBufferSize;
343 Vector<int> m_fileDescriptors;
344 size_t m_fileDescriptorsSize;
345 int m_socketDescriptor;
348 QSocketNotifier* m_socketNotifier;
353 template<typename T> bool Connection::send(const T& message, uint64_t destinationID, unsigned messageSendFlags)
355 OwnPtr<ArgumentEncoder> argumentEncoder = ArgumentEncoder::create(destinationID);
356 argumentEncoder->encode(message);
358 return sendMessage(MessageID(T::messageID), argumentEncoder.release(), messageSendFlags);
361 template<typename T> bool Connection::sendSync(const T& message, const typename T::Reply& reply, uint64_t destinationID, double timeout)
363 uint64_t syncRequestID = 0;
364 OwnPtr<ArgumentEncoder> argumentEncoder = createSyncMessageArgumentEncoder(destinationID, syncRequestID);
366 // Encode the rest of the input arguments.
367 argumentEncoder->encode(message);
369 // Now send the message and wait for a reply.
370 OwnPtr<ArgumentDecoder> replyDecoder = sendSyncMessage(MessageID(T::messageID), syncRequestID, argumentEncoder.release(), timeout);
375 return replyDecoder->decode(const_cast<typename T::Reply&>(reply));
378 template<typename T> bool Connection::waitForAndDispatchImmediately(uint64_t destinationID, double timeout)
380 OwnPtr<ArgumentDecoder> decoder = waitForMessage(MessageID(T::messageID), destinationID, timeout);
384 ASSERT(decoder->destinationID() == destinationID);
385 m_client->didReceiveMessage(this, MessageID(T::messageID), decoder.get());
389 // These three member functions are all deprecated.
391 template<typename E, typename T, typename U>
392 inline bool Connection::deprecatedSendSync(E messageID, uint64_t destinationID, const T& arguments, const U& reply, double timeout)
394 uint64_t syncRequestID = 0;
395 OwnPtr<ArgumentEncoder> argumentEncoder = createSyncMessageArgumentEncoder(destinationID, syncRequestID);
397 // Encode the input arguments.
398 argumentEncoder->encode(arguments);
400 // Now send the message and wait for a reply.
401 OwnPtr<ArgumentDecoder> replyDecoder = sendSyncMessage(MessageID(messageID), syncRequestID, argumentEncoder.release(), timeout);
406 return replyDecoder->decode(const_cast<U&>(reply));
409 template<typename E, typename T>
410 bool Connection::deprecatedSend(E messageID, uint64_t destinationID, const T& arguments)
412 OwnPtr<ArgumentEncoder> argumentEncoder = ArgumentEncoder::create(destinationID);
413 argumentEncoder->encode(arguments);
415 return sendMessage(MessageID(messageID), argumentEncoder.release());
418 } // namespace CoreIPC
420 #endif // Connection_h