2 * Copyright (C) 2010 Apple Inc. All rights reserved.
3 * Copyright (C) 2010 Nokia Corporation and/or its subsidiary(-ies)
4 * Portions Copyright (c) 2010 Motorola Mobility, Inc. All rights reserved.
6 * Redistribution and use in source and binary forms, with or without
7 * modification, are permitted provided that the following conditions
9 * 1. Redistributions of source code must retain the above copyright
10 * notice, this list of conditions and the following disclaimer.
11 * 2. Redistributions in binary form must reproduce the above copyright
12 * notice, this list of conditions and the following disclaimer in the
13 * documentation and/or other materials provided with the distribution.
15 * THIS SOFTWARE IS PROVIDED BY APPLE INC. AND ITS CONTRIBUTORS ``AS IS''
16 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
17 * THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
18 * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL APPLE INC. OR ITS CONTRIBUTORS
19 * BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
20 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
21 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
22 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
23 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
24 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF
25 * THE POSSIBILITY OF SUCH DAMAGE.
31 #include "ArgumentDecoder.h"
32 #include "ArgumentEncoder.h"
33 #include "Arguments.h"
34 #include "BinarySemaphore.h"
35 #include "MessageID.h"
36 #include "WorkQueue.h"
37 #include <wtf/HashMap.h>
38 #include <wtf/PassRefPtr.h>
39 #include <wtf/OwnPtr.h>
40 #include <wtf/Threading.h>
43 #include <mach/mach_port.h>
63 class Connection : public ThreadSafeShared<Connection> {
65 class MessageReceiver {
67 virtual ~MessageReceiver() { }
70 virtual void didReceiveMessage(Connection*, MessageID, ArgumentDecoder*) = 0;
71 virtual SyncReplyMode didReceiveSyncMessage(Connection*, MessageID, ArgumentDecoder*, ArgumentEncoder*) { ASSERT_NOT_REACHED(); return AutomaticReply; }
74 class Client : public MessageReceiver {
79 virtual void didClose(Connection*) = 0;
80 virtual void didReceiveInvalidMessage(Connection*, MessageID) = 0;
82 // Called on the connection work queue when the connection is closed, before
83 // didCall is called on the client thread.
84 virtual void didCloseOnConnectionWorkQueue(WorkQueue*, Connection*) { }
88 typedef mach_port_t Identifier;
90 typedef HANDLE Identifier;
91 static bool createServerAndClientIdentifiers(Identifier& serverIdentifier, Identifier& clientIdentifier);
93 typedef const QString Identifier;
95 typedef int Identifier;
98 static PassRefPtr<Connection> createServerConnection(Identifier, Client*, RunLoop* clientRunLoop);
99 static PassRefPtr<Connection> createClientConnection(Identifier, Client*, RunLoop* clientRunLoop);
103 void setShouldCloseConnectionOnMachExceptions();
109 // FIXME: This variant of send is deprecated, all clients should move to the overload that takes a message.
110 template<typename E, typename T> bool send(E messageID, uint64_t destinationID, const T& arguments);
112 template<typename T> bool send(const T& message, uint64_t destinationID);
114 static const unsigned long long NoTimeout = 10000000000ULL;
115 // FIXME: This variant of sendSync is deprecated, all clients should move to the overload that takes a message.
116 template<typename E, typename T, typename U> bool sendSync(E messageID, uint64_t destinationID, const T& arguments, const U& reply, double timeout = NoTimeout);
118 template<typename T> bool sendSync(const T& message, const typename T::Reply& reply, uint64_t destinationID, double timeout = NoTimeout);
120 template<typename E> PassOwnPtr<ArgumentDecoder> waitFor(E messageID, uint64_t destinationID, double timeout);
122 PassOwnPtr<ArgumentEncoder> createSyncMessageArgumentEncoder(uint64_t destinationID, uint64_t& syncRequestID);
123 bool sendMessage(MessageID, PassOwnPtr<ArgumentEncoder>);
124 bool sendSyncReply(PassOwnPtr<ArgumentEncoder>);
127 template<typename T> class Message {
134 Message(MessageID messageID, PassOwnPtr<T> arguments)
135 : m_messageID(messageID)
136 , m_arguments(arguments.leakPtr())
140 MessageID messageID() const { return m_messageID; }
141 T* arguments() const { return m_arguments; }
143 PassOwnPtr<T> releaseArguments()
145 T* arguments = m_arguments;
152 MessageID m_messageID;
157 typedef Message<ArgumentEncoder> OutgoingMessage;
160 Connection(Identifier, bool isServer, Client*, RunLoop* clientRunLoop);
161 void platformInitialize(Identifier);
162 void platformInvalidate();
164 bool isValid() const { return m_client; }
166 PassOwnPtr<ArgumentDecoder> waitForMessage(MessageID, uint64_t destinationID, double timeout);
168 PassOwnPtr<ArgumentDecoder> sendSyncMessage(MessageID, uint64_t syncRequestID, PassOwnPtr<ArgumentEncoder>, double timeout);
169 PassOwnPtr<ArgumentDecoder> waitForSyncReply(uint64_t syncRequestID, double timeout);
171 // Called on the connection work queue.
172 void processIncomingMessage(MessageID, PassOwnPtr<ArgumentDecoder>);
173 bool canSendOutgoingMessages() const;
174 bool platformCanSendOutgoingMessages() const;
175 void sendOutgoingMessages();
176 bool sendOutgoingMessage(MessageID, PassOwnPtr<ArgumentEncoder>);
177 void connectionDidClose();
179 // Called on the listener thread.
180 void dispatchConnectionDidClose();
181 void dispatchMessages();
182 void dispatchSyncMessage(MessageID, ArgumentDecoder*);
186 uint64_t m_syncRequestID;
189 WorkQueue m_connectionQueue;
190 RunLoop* m_clientRunLoop;
192 // Incoming messages.
193 typedef Message<ArgumentDecoder> IncomingMessage;
195 Mutex m_incomingMessagesLock;
196 Vector<IncomingMessage> m_incomingMessages;
198 // Outgoing messages.
199 Mutex m_outgoingMessagesLock;
200 Deque<OutgoingMessage> m_outgoingMessages;
202 ThreadCondition m_waitForMessageCondition;
203 Mutex m_waitForMessageMutex;
204 HashMap<std::pair<unsigned, uint64_t>, ArgumentDecoder*> m_waitForMessageMap;
206 // Represents a sync request for which we're waiting on a reply.
207 struct PendingSyncReply {
209 uint64_t syncRequestID;
211 // The reply decoder, will be null if there was an error processing the sync
212 // message on the other side.
213 ArgumentDecoder* replyDecoder;
215 // Will be set to true once a reply has been received or an error occurred.
216 bool didReceiveReply;
221 , didReceiveReply(false)
225 explicit PendingSyncReply(uint64_t syncRequestID)
226 : syncRequestID(syncRequestID)
232 PassOwnPtr<ArgumentDecoder> releaseReplyDecoder()
234 OwnPtr<ArgumentDecoder> reply = adoptPtr(replyDecoder);
237 return reply.release();
241 BinarySemaphore m_waitForSyncReplySemaphore;
243 Mutex m_syncReplyStateMutex;
244 bool m_shouldWaitForSyncReplies;
245 Vector<PendingSyncReply> m_pendingSyncReplies;
246 Vector<IncomingMessage> m_syncMessagesReceivedWhileWaitingForSyncReply;
249 // Called on the connection queue.
250 void receiveSourceEventHandler();
251 void initializeDeadNameSource();
252 void exceptionSourceEventHandler();
254 mach_port_t m_sendPort;
255 mach_port_t m_receivePort;
257 // If setShouldCloseConnectionOnMachExceptions has been called, this has
258 // the exception port that exceptions from the other end will be sent on.
259 mach_port_t m_exceptionPort;
262 // Called on the connection queue.
263 void readEventHandler();
264 void writeEventHandler();
266 Vector<uint8_t> m_readBuffer;
267 OVERLAPPED m_readState;
268 OwnPtr<ArgumentEncoder> m_pendingWriteArguments;
269 OVERLAPPED m_writeState;
270 HANDLE m_connectionPipe;
272 // Called on the connection queue.
273 void readyReadHandler();
275 Vector<uint8_t> m_readBuffer;
276 size_t m_currentMessageSize;
277 QLocalSocket* m_socket;
278 QString m_serverName;
280 void readEventHandler();
281 void processCompletedMessage();
282 bool messageProcessingCompleted() { return !m_currentMessageSize; }
285 Vector<uint8_t> m_readBuffer;
286 size_t m_currentMessageSize;
287 size_t m_pendingBytes;
291 template<typename E, typename T>
292 bool Connection::send(E messageID, uint64_t destinationID, const T& arguments)
294 OwnPtr<ArgumentEncoder> argumentEncoder = ArgumentEncoder::create(destinationID);
295 argumentEncoder->encode(arguments);
297 return sendMessage(MessageID(messageID), argumentEncoder.release());
300 template<typename T> bool Connection::send(const T& message, uint64_t destinationID)
302 OwnPtr<ArgumentEncoder> argumentEncoder = ArgumentEncoder::create(destinationID);
303 argumentEncoder->encode(message);
305 return sendMessage(MessageID(T::messageID), argumentEncoder.release());
308 template<typename E, typename T, typename U>
309 inline bool Connection::sendSync(E messageID, uint64_t destinationID, const T& arguments, const U& reply, double timeout)
311 uint64_t syncRequestID = 0;
312 OwnPtr<ArgumentEncoder> argumentEncoder = createSyncMessageArgumentEncoder(destinationID, syncRequestID);
314 // Encode the input arguments.
315 argumentEncoder->encode(arguments);
317 // Now send the message and wait for a reply.
318 OwnPtr<ArgumentDecoder> replyDecoder = sendSyncMessage(MessageID(messageID, MessageID::SyncMessage), syncRequestID, argumentEncoder.release(), timeout);
323 return replyDecoder->decode(const_cast<U&>(reply));
326 template<typename T> bool Connection::sendSync(const T& message, const typename T::Reply& reply, uint64_t destinationID, double timeout)
328 uint64_t syncRequestID = 0;
329 OwnPtr<ArgumentEncoder> argumentEncoder = createSyncMessageArgumentEncoder(destinationID, syncRequestID);
331 // Encode the rest of the input arguments.
332 argumentEncoder->encode(message);
334 // Now send the message and wait for a reply.
335 OwnPtr<ArgumentDecoder> replyDecoder = sendSyncMessage(MessageID(T::messageID, MessageID::SyncMessage), syncRequestID, argumentEncoder.release(), timeout);
340 return replyDecoder->decode(const_cast<typename T::Reply&>(reply));
343 template<typename E> inline PassOwnPtr<ArgumentDecoder> Connection::waitFor(E messageID, uint64_t destinationID, double timeout)
345 return waitForMessage(MessageID(messageID), destinationID, timeout);
348 } // namespace CoreIPC
350 #endif // Connection_h