f1f1b696d4e18a042f6286ef3c4f809f10d676a5
[WebKit-https.git] / WebKit2 / Platform / CoreIPC / Connection.cpp
1 /*
2  * Copyright (C) 2010 Apple Inc. All rights reserved.
3  *
4  * Redistribution and use in source and binary forms, with or without
5  * modification, are permitted provided that the following conditions
6  * are met:
7  * 1. Redistributions of source code must retain the above copyright
8  *    notice, this list of conditions and the following disclaimer.
9  * 2. Redistributions in binary form must reproduce the above copyright
10  *    notice, this list of conditions and the following disclaimer in the
11  *    documentation and/or other materials provided with the distribution.
12  *
13  * THIS SOFTWARE IS PROVIDED BY APPLE INC. AND ITS CONTRIBUTORS ``AS IS''
14  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
15  * THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
16  * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL APPLE INC. OR ITS CONTRIBUTORS
17  * BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
18  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
19  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
20  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
21  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
22  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF
23  * THE POSSIBILITY OF SUCH DAMAGE.
24  */
25
26 #include "Connection.h"
27
28 #include "CoreIPCMessageKinds.h"
29 #include "RunLoop.h"
30 #include "WorkItem.h"
31 #include <wtf/CurrentTime.h>
32
33 using namespace std;
34
35 namespace CoreIPC {
36
37 PassRefPtr<Connection> Connection::createServerConnection(Identifier identifier, Client* client, RunLoop* clientRunLoop)
38 {
39     return adoptRef(new Connection(identifier, true, client, clientRunLoop));
40 }
41
42 PassRefPtr<Connection> Connection::createClientConnection(Identifier identifier, Client* client, RunLoop* clientRunLoop)
43 {
44     return adoptRef(new Connection(identifier, false, client, clientRunLoop));
45 }
46
47 Connection::Connection(Identifier identifier, bool isServer, Client* client, RunLoop* clientRunLoop)
48     : m_client(client)
49     , m_isServer(isServer)
50     , m_syncRequestID(0)
51     , m_isConnected(false)
52     , m_connectionQueue("com.apple.CoreIPC.ReceiveQueue")
53     , m_clientRunLoop(clientRunLoop)
54     , m_shouldWaitForSyncReplies(true)
55 {
56     ASSERT(m_client);
57
58     platformInitialize(identifier);
59 }
60
61 Connection::~Connection()
62 {
63     ASSERT(!isValid());
64
65     m_connectionQueue.invalidate();
66 }
67
68 void Connection::invalidate()
69 {
70     if (!isValid()) {
71         // Someone already called invalidate().
72         return;
73     }
74     
75     // Reset the client.
76     m_client = 0;
77
78     m_connectionQueue.scheduleWork(WorkItem::create(this, &Connection::platformInvalidate));
79 }
80
81 PassOwnPtr<ArgumentEncoder> Connection::createSyncMessageArgumentEncoder(uint64_t destinationID, uint64_t& syncRequestID)
82 {
83     OwnPtr<ArgumentEncoder> argumentEncoder = ArgumentEncoder::create(destinationID);
84
85     // Encode the sync request ID.
86     syncRequestID = ++m_syncRequestID;
87     argumentEncoder->encode(syncRequestID);
88
89     return argumentEncoder.release();
90 }
91
92 bool Connection::sendMessage(MessageID messageID, PassOwnPtr<ArgumentEncoder> arguments)
93 {
94     if (!isValid())
95         return false;
96
97     MutexLocker locker(m_outgoingMessagesLock);
98     m_outgoingMessages.append(OutgoingMessage(messageID, arguments));
99     
100     // FIXME: We should add a boolean flag so we don't call this when work has already been scheduled.
101     m_connectionQueue.scheduleWork(WorkItem::create(this, &Connection::sendOutgoingMessages));
102     return true;
103 }
104
105 bool Connection::sendSyncReply(PassOwnPtr<ArgumentEncoder> arguments)
106 {
107     return sendMessage(MessageID(CoreIPCMessage::SyncMessageReply), arguments);
108 }
109
110 PassOwnPtr<ArgumentDecoder> Connection::waitForMessage(MessageID messageID, uint64_t destinationID, double timeout)
111 {
112     // First, check if this message is already in the incoming messages queue.
113     {
114         MutexLocker locker(m_incomingMessagesLock);
115
116         for (size_t i = 0; i < m_incomingMessages.size(); ++i) {
117             const IncomingMessage& message = m_incomingMessages[i];
118
119             if (message.messageID() == messageID && message.arguments()->destinationID() == destinationID) {
120                 OwnPtr<ArgumentDecoder> arguments(message.arguments());
121                 
122                 // Erase the incoming message.
123                 m_incomingMessages.remove(i);
124                 return arguments.release();
125             }
126         }
127     }
128     
129     double absoluteTime = currentTime() + timeout;
130     
131     std::pair<unsigned, uint64_t> messageAndDestination(std::make_pair(messageID.toInt(), destinationID));
132     
133     {
134         MutexLocker locker(m_waitForMessageMutex);
135
136         // We don't support having multiple clients wait for the same message.
137         ASSERT(!m_waitForMessageMap.contains(messageAndDestination));
138     
139         // Insert our pending wait.
140         m_waitForMessageMap.set(messageAndDestination, 0);
141     }
142     
143     // Now wait for it to be set.
144     while (true) {
145         MutexLocker locker(m_waitForMessageMutex);
146
147         HashMap<std::pair<unsigned, uint64_t>, ArgumentDecoder*>::iterator it = m_waitForMessageMap.find(messageAndDestination);
148         if (it->second) {
149             OwnPtr<ArgumentDecoder> arguments(it->second);
150             m_waitForMessageMap.remove(it);
151             
152             return arguments.release();
153         }
154         
155         // Now we wait.
156         if (!m_waitForMessageCondition.timedWait(m_waitForMessageMutex, absoluteTime)) {
157             // We timed out, now remove the pending wait.
158             m_waitForMessageMap.remove(messageAndDestination);
159
160             break;
161         }
162     }
163     
164     return PassOwnPtr<ArgumentDecoder>();
165 }
166
167 PassOwnPtr<ArgumentDecoder> Connection::sendSyncMessage(MessageID messageID, uint64_t syncRequestID, PassOwnPtr<ArgumentEncoder> encoder, double timeout)
168 {
169     // We only allow sending sync messages from the client run loop.
170     ASSERT(RunLoop::current() == m_clientRunLoop);
171
172     if (!isValid())
173         return 0;
174     
175     // Push the pending sync reply information on our stack.
176     {
177         MutexLocker locker(m_syncReplyStateMutex);
178         if (!m_shouldWaitForSyncReplies)
179             return 0;
180
181         m_pendingSyncReplies.append(PendingSyncReply(syncRequestID));
182     }
183     
184     // First send the message.
185     sendMessage(messageID, encoder);
186     
187     // Then wait for a reply. Waiting for a reply could involve dispatching incoming sync messages, so
188     // keep an extra reference to the connection here in case it's invalidated.
189     RefPtr<Connection> protect(this);
190     OwnPtr<ArgumentDecoder> reply = waitForSyncReply(syncRequestID, timeout);
191
192     // Finally, pop the pending sync reply information.
193     {
194         MutexLocker locker(m_syncReplyStateMutex);
195         ASSERT(m_pendingSyncReplies.last().syncRequestID == syncRequestID);
196         m_pendingSyncReplies.removeLast();
197
198         if (m_pendingSyncReplies.isEmpty()) {
199             // This was the bottom-most sendSyncMessage call in the stack. If we have any pending incoming
200             // sync messages, they need to be dispatched.
201             if (!m_syncMessagesReceivedWhileWaitingForSyncReply.isEmpty()) {
202                 // Add the messages.
203                 MutexLocker locker(m_incomingMessagesLock);
204                 m_incomingMessages.append(m_syncMessagesReceivedWhileWaitingForSyncReply);
205                 m_syncMessagesReceivedWhileWaitingForSyncReply.clear();
206
207                 // Schedule for the messages to be sent.
208                 m_clientRunLoop->scheduleWork(WorkItem::create(this, &Connection::dispatchMessages));
209             }
210         }
211     }
212     
213     return reply.release();
214 }
215
216 PassOwnPtr<ArgumentDecoder> Connection::waitForSyncReply(uint64_t syncRequestID, double timeout)
217 {
218     double absoluteTime = currentTime() + timeout;
219
220     bool timedOut = false;
221     while (!timedOut) {
222         {
223             MutexLocker locker(m_syncReplyStateMutex);
224
225             // First, check if we have any incoming sync messages that we need to process.
226             Vector<IncomingMessage> syncMessagesReceivedWhileWaitingForSyncReply;
227             m_syncMessagesReceivedWhileWaitingForSyncReply.swap(syncMessagesReceivedWhileWaitingForSyncReply);
228
229             if (!syncMessagesReceivedWhileWaitingForSyncReply.isEmpty()) {
230                 // Make sure to unlock the mutex here because we're calling out to client code which could in turn send
231                 // another sync message and we don't want that to deadlock.
232                 m_syncReplyStateMutex.unlock();
233                 
234                 for (size_t i = 0; i < syncMessagesReceivedWhileWaitingForSyncReply.size(); ++i) {
235                     IncomingMessage& message = syncMessagesReceivedWhileWaitingForSyncReply[i];
236                     OwnPtr<ArgumentDecoder> arguments = message.releaseArguments();
237
238                     dispatchSyncMessage(message.messageID(), arguments.get());
239                 }
240                 m_syncReplyStateMutex.lock();
241             }
242
243             // Second, check if there is a sync reply at the top of the stack.
244             ASSERT(!m_pendingSyncReplies.isEmpty());
245             
246             PendingSyncReply& pendingSyncReply = m_pendingSyncReplies.last();
247             ASSERT(pendingSyncReply.syncRequestID == syncRequestID);
248             
249             // We found the sync reply, or the connection was closed.
250             if (pendingSyncReply.didReceiveReply || !m_shouldWaitForSyncReplies)
251                 return pendingSyncReply.releaseReplyDecoder();
252         }
253
254         // We didn't find a sync reply yet, keep waiting.
255         timedOut = !m_waitForSyncReplySemaphore.wait(absoluteTime);
256     }
257
258     // We timed out.
259     return 0;
260 }
261
262 void Connection::processIncomingMessage(MessageID messageID, PassOwnPtr<ArgumentDecoder> arguments)
263 {
264     // Check if this is a sync reply.
265     if (messageID == MessageID(CoreIPCMessage::SyncMessageReply)) {
266         MutexLocker locker(m_syncReplyStateMutex);
267         ASSERT(!m_pendingSyncReplies.isEmpty());
268
269         PendingSyncReply& pendingSyncReply = m_pendingSyncReplies.last();
270         ASSERT(pendingSyncReply.syncRequestID == arguments->destinationID());
271
272         pendingSyncReply.replyDecoder = arguments.leakPtr();
273         pendingSyncReply.didReceiveReply = true;
274
275         m_waitForSyncReplySemaphore.signal();
276         return;
277     }
278
279     // Check if this is a sync message. If it is, and we're waiting for a sync reply this message
280     // needs to be dispatched. If we don't we'll end up with a deadlock where both sync message senders are
281     // stuck waiting for a reply.
282     if (messageID.isSync()) {
283         MutexLocker locker(m_syncReplyStateMutex);
284         if (!m_pendingSyncReplies.isEmpty()) {
285             m_syncMessagesReceivedWhileWaitingForSyncReply.append(IncomingMessage(messageID, arguments));
286
287             // The message has been added, now wake up the client thread.
288             m_waitForSyncReplySemaphore.signal();
289             return;
290         }
291     }
292         
293     // Check if we're waiting for this message.
294     {
295         MutexLocker locker(m_waitForMessageMutex);
296         
297         HashMap<std::pair<unsigned, uint64_t>, ArgumentDecoder*>::iterator it = m_waitForMessageMap.find(std::make_pair(messageID.toInt(), arguments->destinationID()));
298         if (it != m_waitForMessageMap.end()) {
299             it->second = arguments.leakPtr();
300         
301             m_waitForMessageCondition.signal();
302             return;
303         }
304     }
305
306     MutexLocker locker(m_incomingMessagesLock);
307     m_incomingMessages.append(IncomingMessage(messageID, arguments));
308
309     m_clientRunLoop->scheduleWork(WorkItem::create(this, &Connection::dispatchMessages));
310 }
311
312 void Connection::connectionDidClose()
313 {
314     // The connection is now invalid.
315     platformInvalidate();
316
317     {
318         MutexLocker locker(m_syncReplyStateMutex);
319
320         ASSERT(m_shouldWaitForSyncReplies);
321         m_shouldWaitForSyncReplies = false;
322
323         if (!m_pendingSyncReplies.isEmpty())
324             m_waitForSyncReplySemaphore.signal();
325     }
326
327     m_client->didCloseOnConnectionWorkQueue(&m_connectionQueue, this);
328
329     m_clientRunLoop->scheduleWork(WorkItem::create(this, &Connection::dispatchConnectionDidClose));
330 }
331
332 void Connection::dispatchConnectionDidClose()
333 {
334     // If the connection has been explicitly invalidated before dispatchConnectionDidClose was called,
335     // then the client will be null here.
336     if (!m_client)
337         return;
338
339
340     // Because we define a connection as being "valid" based on wheter it has a null client, we null out
341     // the client before calling didClose here. Otherwise, sendSync will try to send a message to the connection and
342     // will then wait indefinitely for a reply.
343     Client* client = m_client;
344     m_client = 0;
345     
346     client->didClose(this);
347 }
348
349 bool Connection::canSendOutgoingMessages() const
350 {
351     return m_isConnected && platformCanSendOutgoingMessages();
352 }
353
354 void Connection::sendOutgoingMessages()
355 {
356     if (!canSendOutgoingMessages())
357         return;
358
359     while (true) {
360         OutgoingMessage message;
361         {
362             MutexLocker locker(m_outgoingMessagesLock);
363             if (m_outgoingMessages.isEmpty())
364                 break;
365             message = m_outgoingMessages.takeFirst();
366         }
367
368         if (!sendOutgoingMessage(message.messageID(), adoptPtr(message.arguments())))
369             break;
370     }
371 }
372
373 void Connection::dispatchSyncMessage(MessageID messageID, ArgumentDecoder* arguments)
374 {
375     ASSERT(messageID.isSync());
376
377     // Decode the sync request ID.
378     uint64_t syncRequestID = 0;
379
380     if (!arguments->decodeUInt64(syncRequestID) || !syncRequestID) {
381         // We received an invalid sync message.
382         arguments->markInvalid();
383         return;
384     }
385
386     // Create our reply encoder.
387     ArgumentEncoder* replyEncoder = ArgumentEncoder::create(syncRequestID).leakPtr();
388     
389     // Hand off both the decoder and encoder to the client..
390     SyncReplyMode syncReplyMode = m_client->didReceiveSyncMessage(this, messageID, arguments, replyEncoder);
391
392     // FIXME: If the message was invalid, we should send back a SyncMessageError.
393     ASSERT(!arguments->isInvalid());
394
395     if (syncReplyMode == ManualReply) {
396         // The client will take ownership of the reply encoder and send it at some point in the future.
397         // We won't do anything here.
398         return;
399     }
400
401     // Send the reply.
402     sendSyncReply(replyEncoder);
403 }
404
405 void Connection::dispatchMessages()
406 {
407     Vector<IncomingMessage> incomingMessages;
408     
409     {
410         MutexLocker locker(m_incomingMessagesLock);
411         m_incomingMessages.swap(incomingMessages);
412     }
413
414     // Dispatch messages.
415     for (size_t i = 0; i < incomingMessages.size(); ++i) {
416         // If someone calls invalidate while we're invalidating messages, we should stop.
417         if (!m_client)
418             return;
419         
420         IncomingMessage& message = incomingMessages[i];
421         OwnPtr<ArgumentDecoder> arguments = message.releaseArguments();
422
423         if (message.messageID().isSync())
424             dispatchSyncMessage(message.messageID(), arguments.get());
425         else
426             m_client->didReceiveMessage(this, message.messageID(), arguments.get());
427
428         if (arguments->isInvalid())
429             m_client->didReceiveInvalidMessage(this, message.messageID());
430     }
431 }
432
433 } // namespace CoreIPC