2 * Copyright (C) 2010 Apple Inc. All rights reserved.
3 * Copyright (C) 2010 Nokia Corporation and/or its subsidiary(-ies)
4 * Portions Copyright (c) 2010 Motorola Mobility, Inc. All rights reserved.
6 * Redistribution and use in source and binary forms, with or without
7 * modification, are permitted provided that the following conditions
9 * 1. Redistributions of source code must retain the above copyright
10 * notice, this list of conditions and the following disclaimer.
11 * 2. Redistributions in binary form must reproduce the above copyright
12 * notice, this list of conditions and the following disclaimer in the
13 * documentation and/or other materials provided with the distribution.
15 * THIS SOFTWARE IS PROVIDED BY APPLE INC. AND ITS CONTRIBUTORS ``AS IS''
16 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
17 * THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
18 * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL APPLE INC. OR ITS CONTRIBUTORS
19 * BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
20 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
21 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
22 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
23 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
24 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF
25 * THE POSSIBILITY OF SUCH DAMAGE.
31 #include "ArgumentDecoder.h"
32 #include "ArgumentEncoder.h"
33 #include "Arguments.h"
34 #include "BinarySemaphore.h"
35 #include "MessageID.h"
36 #include "WorkQueue.h"
37 #include <wtf/HashMap.h>
38 #include <wtf/PassRefPtr.h>
39 #include <wtf/OwnPtr.h>
40 #include <wtf/Threading.h>
43 #include <mach/mach_port.h>
63 #define MESSAGE_CHECK_BASE(assertion, connection) do \
66 (connection)->markCurrentlyDispatchedMessageAsInvalid(); \
71 class Connection : public ThreadSafeShared<Connection> {
73 class MessageReceiver {
75 virtual ~MessageReceiver() { }
78 virtual void didReceiveMessage(Connection*, MessageID, ArgumentDecoder*) = 0;
79 virtual SyncReplyMode didReceiveSyncMessage(Connection*, MessageID, ArgumentDecoder*, ArgumentEncoder*) { ASSERT_NOT_REACHED(); return AutomaticReply; }
82 class Client : public MessageReceiver {
87 virtual void didClose(Connection*) = 0;
88 virtual void didReceiveInvalidMessage(Connection*, MessageID) = 0;
90 // Called on the connection work queue when the connection is closed, before
91 // didCall is called on the client thread.
92 virtual void didCloseOnConnectionWorkQueue(WorkQueue*, Connection*) { }
96 typedef mach_port_t Identifier;
98 typedef HANDLE Identifier;
99 static bool createServerAndClientIdentifiers(Identifier& serverIdentifier, Identifier& clientIdentifier);
101 typedef const QString Identifier;
103 typedef int Identifier;
106 static PassRefPtr<Connection> createServerConnection(Identifier, Client*, RunLoop* clientRunLoop);
107 static PassRefPtr<Connection> createClientConnection(Identifier, Client*, RunLoop* clientRunLoop);
111 void setShouldCloseConnectionOnMachExceptions();
116 void markCurrentlyDispatchedMessageAsInvalid();
118 // FIXME: This variant of send is deprecated, all clients should move to the overload that takes a message.
119 template<typename E, typename T> bool send(E messageID, uint64_t destinationID, const T& arguments);
121 template<typename T> bool send(const T& message, uint64_t destinationID);
123 static const unsigned long long NoTimeout = 10000000000ULL;
124 // FIXME: This variant of sendSync is deprecated, all clients should move to the overload that takes a message.
125 template<typename E, typename T, typename U> bool sendSync(E messageID, uint64_t destinationID, const T& arguments, const U& reply, double timeout = NoTimeout);
127 template<typename T> bool sendSync(const T& message, const typename T::Reply& reply, uint64_t destinationID, double timeout = NoTimeout);
129 template<typename E> PassOwnPtr<ArgumentDecoder> waitFor(E messageID, uint64_t destinationID, double timeout);
131 PassOwnPtr<ArgumentEncoder> createSyncMessageArgumentEncoder(uint64_t destinationID, uint64_t& syncRequestID);
132 bool sendMessage(MessageID, PassOwnPtr<ArgumentEncoder>);
133 bool sendSyncReply(PassOwnPtr<ArgumentEncoder>);
136 template<typename T> class Message {
143 Message(MessageID messageID, PassOwnPtr<T> arguments)
144 : m_messageID(messageID)
145 , m_arguments(arguments.leakPtr())
149 MessageID messageID() const { return m_messageID; }
150 T* arguments() const { return m_arguments; }
152 PassOwnPtr<T> releaseArguments()
154 T* arguments = m_arguments;
161 MessageID m_messageID;
166 typedef Message<ArgumentEncoder> OutgoingMessage;
169 Connection(Identifier, bool isServer, Client*, RunLoop* clientRunLoop);
170 void platformInitialize(Identifier);
171 void platformInvalidate();
173 bool isValid() const { return m_client; }
175 PassOwnPtr<ArgumentDecoder> waitForMessage(MessageID, uint64_t destinationID, double timeout);
177 PassOwnPtr<ArgumentDecoder> sendSyncMessage(MessageID, uint64_t syncRequestID, PassOwnPtr<ArgumentEncoder>, double timeout);
178 PassOwnPtr<ArgumentDecoder> waitForSyncReply(uint64_t syncRequestID, double timeout);
180 // Called on the connection work queue.
181 void processIncomingMessage(MessageID, PassOwnPtr<ArgumentDecoder>);
182 bool canSendOutgoingMessages() const;
183 bool platformCanSendOutgoingMessages() const;
184 void sendOutgoingMessages();
185 bool sendOutgoingMessage(MessageID, PassOwnPtr<ArgumentEncoder>);
186 void connectionDidClose();
188 // Called on the listener thread.
189 void dispatchConnectionDidClose();
190 void dispatchMessages();
191 void dispatchSyncMessage(MessageID, ArgumentDecoder*);
195 uint64_t m_syncRequestID;
198 WorkQueue m_connectionQueue;
199 RunLoop* m_clientRunLoop;
201 uint32_t m_inDispatchMessageCount;
202 bool m_didReceiveInvalidMessage;
204 // Incoming messages.
205 typedef Message<ArgumentDecoder> IncomingMessage;
207 Mutex m_incomingMessagesLock;
208 Vector<IncomingMessage> m_incomingMessages;
210 // Outgoing messages.
211 Mutex m_outgoingMessagesLock;
212 Deque<OutgoingMessage> m_outgoingMessages;
214 ThreadCondition m_waitForMessageCondition;
215 Mutex m_waitForMessageMutex;
216 HashMap<std::pair<unsigned, uint64_t>, ArgumentDecoder*> m_waitForMessageMap;
218 // Represents a sync request for which we're waiting on a reply.
219 struct PendingSyncReply {
221 uint64_t syncRequestID;
223 // The reply decoder, will be null if there was an error processing the sync
224 // message on the other side.
225 ArgumentDecoder* replyDecoder;
227 // Will be set to true once a reply has been received or an error occurred.
228 bool didReceiveReply;
233 , didReceiveReply(false)
237 explicit PendingSyncReply(uint64_t syncRequestID)
238 : syncRequestID(syncRequestID)
244 PassOwnPtr<ArgumentDecoder> releaseReplyDecoder()
246 OwnPtr<ArgumentDecoder> reply = adoptPtr(replyDecoder);
249 return reply.release();
253 BinarySemaphore m_waitForSyncReplySemaphore;
255 Mutex m_syncReplyStateMutex;
256 bool m_shouldWaitForSyncReplies;
257 Vector<PendingSyncReply> m_pendingSyncReplies;
258 Vector<IncomingMessage> m_syncMessagesReceivedWhileWaitingForSyncReply;
261 // Called on the connection queue.
262 void receiveSourceEventHandler();
263 void initializeDeadNameSource();
264 void exceptionSourceEventHandler();
266 mach_port_t m_sendPort;
267 mach_port_t m_receivePort;
269 // If setShouldCloseConnectionOnMachExceptions has been called, this has
270 // the exception port that exceptions from the other end will be sent on.
271 mach_port_t m_exceptionPort;
274 // Called on the connection queue.
275 void readEventHandler();
276 void writeEventHandler();
278 Vector<uint8_t> m_readBuffer;
279 OVERLAPPED m_readState;
280 OwnPtr<ArgumentEncoder> m_pendingWriteArguments;
281 OVERLAPPED m_writeState;
282 HANDLE m_connectionPipe;
284 // Called on the connection queue.
285 void readyReadHandler();
287 Vector<uint8_t> m_readBuffer;
288 size_t m_currentMessageSize;
289 QLocalSocket* m_socket;
290 QString m_serverName;
292 void readEventHandler();
293 void processCompletedMessage();
294 bool messageProcessingCompleted() { return !m_currentMessageSize; }
297 Vector<uint8_t> m_readBuffer;
298 size_t m_currentMessageSize;
299 size_t m_pendingBytes;
303 template<typename E, typename T>
304 bool Connection::send(E messageID, uint64_t destinationID, const T& arguments)
306 OwnPtr<ArgumentEncoder> argumentEncoder = ArgumentEncoder::create(destinationID);
307 argumentEncoder->encode(arguments);
309 return sendMessage(MessageID(messageID), argumentEncoder.release());
312 template<typename T> bool Connection::send(const T& message, uint64_t destinationID)
314 OwnPtr<ArgumentEncoder> argumentEncoder = ArgumentEncoder::create(destinationID);
315 argumentEncoder->encode(message);
317 return sendMessage(MessageID(T::messageID), argumentEncoder.release());
320 template<typename E, typename T, typename U>
321 inline bool Connection::sendSync(E messageID, uint64_t destinationID, const T& arguments, const U& reply, double timeout)
323 uint64_t syncRequestID = 0;
324 OwnPtr<ArgumentEncoder> argumentEncoder = createSyncMessageArgumentEncoder(destinationID, syncRequestID);
326 // Encode the input arguments.
327 argumentEncoder->encode(arguments);
329 // Now send the message and wait for a reply.
330 OwnPtr<ArgumentDecoder> replyDecoder = sendSyncMessage(MessageID(messageID, MessageID::SyncMessage), syncRequestID, argumentEncoder.release(), timeout);
335 return replyDecoder->decode(const_cast<U&>(reply));
338 template<typename T> bool Connection::sendSync(const T& message, const typename T::Reply& reply, uint64_t destinationID, double timeout)
340 uint64_t syncRequestID = 0;
341 OwnPtr<ArgumentEncoder> argumentEncoder = createSyncMessageArgumentEncoder(destinationID, syncRequestID);
343 // Encode the rest of the input arguments.
344 argumentEncoder->encode(message);
346 // Now send the message and wait for a reply.
347 OwnPtr<ArgumentDecoder> replyDecoder = sendSyncMessage(MessageID(T::messageID, MessageID::SyncMessage), syncRequestID, argumentEncoder.release(), timeout);
352 return replyDecoder->decode(const_cast<typename T::Reply&>(reply));
355 template<typename E> inline PassOwnPtr<ArgumentDecoder> Connection::waitFor(E messageID, uint64_t destinationID, double timeout)
357 return waitForMessage(MessageID(messageID), destinationID, timeout);
360 } // namespace CoreIPC
362 #endif // Connection_h