WebProcess should be terminated if invalid frameIDs are
[WebKit-https.git] / WebKit2 / Platform / CoreIPC / Connection.cpp
1 /*
2  * Copyright (C) 2010 Apple Inc. All rights reserved.
3  *
4  * Redistribution and use in source and binary forms, with or without
5  * modification, are permitted provided that the following conditions
6  * are met:
7  * 1. Redistributions of source code must retain the above copyright
8  *    notice, this list of conditions and the following disclaimer.
9  * 2. Redistributions in binary form must reproduce the above copyright
10  *    notice, this list of conditions and the following disclaimer in the
11  *    documentation and/or other materials provided with the distribution.
12  *
13  * THIS SOFTWARE IS PROVIDED BY APPLE INC. AND ITS CONTRIBUTORS ``AS IS''
14  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
15  * THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
16  * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL APPLE INC. OR ITS CONTRIBUTORS
17  * BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
18  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
19  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
20  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
21  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
22  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF
23  * THE POSSIBILITY OF SUCH DAMAGE.
24  */
25
26 #include "Connection.h"
27
28 #include "CoreIPCMessageKinds.h"
29 #include "RunLoop.h"
30 #include "WorkItem.h"
31 #include <wtf/CurrentTime.h>
32
33 using namespace std;
34
35 namespace CoreIPC {
36
37 PassRefPtr<Connection> Connection::createServerConnection(Identifier identifier, Client* client, RunLoop* clientRunLoop)
38 {
39     return adoptRef(new Connection(identifier, true, client, clientRunLoop));
40 }
41
42 PassRefPtr<Connection> Connection::createClientConnection(Identifier identifier, Client* client, RunLoop* clientRunLoop)
43 {
44     return adoptRef(new Connection(identifier, false, client, clientRunLoop));
45 }
46
47 Connection::Connection(Identifier identifier, bool isServer, Client* client, RunLoop* clientRunLoop)
48     : m_client(client)
49     , m_isServer(isServer)
50     , m_syncRequestID(0)
51     , m_isConnected(false)
52     , m_connectionQueue("com.apple.CoreIPC.ReceiveQueue")
53     , m_clientRunLoop(clientRunLoop)
54     , m_inDispatchMessageCount(0)
55     , m_didReceiveInvalidMessage(false)
56     , m_shouldWaitForSyncReplies(true)
57 {
58     ASSERT(m_client);
59
60     platformInitialize(identifier);
61 }
62
63 Connection::~Connection()
64 {
65     ASSERT(!isValid());
66
67     m_connectionQueue.invalidate();
68 }
69
70 void Connection::invalidate()
71 {
72     if (!isValid()) {
73         // Someone already called invalidate().
74         return;
75     }
76     
77     // Reset the client.
78     m_client = 0;
79
80     m_connectionQueue.scheduleWork(WorkItem::create(this, &Connection::platformInvalidate));
81 }
82
83 void Connection::markCurrentlyDispatchedMessageAsInvalid()
84 {
85     // This should only be called while processing a message.
86     ASSERT(m_inDispatchMessageCount > 0);
87
88     m_didReceiveInvalidMessage = true;
89 }
90
91 PassOwnPtr<ArgumentEncoder> Connection::createSyncMessageArgumentEncoder(uint64_t destinationID, uint64_t& syncRequestID)
92 {
93     OwnPtr<ArgumentEncoder> argumentEncoder = ArgumentEncoder::create(destinationID);
94
95     // Encode the sync request ID.
96     syncRequestID = ++m_syncRequestID;
97     argumentEncoder->encode(syncRequestID);
98
99     return argumentEncoder.release();
100 }
101
102 bool Connection::sendMessage(MessageID messageID, PassOwnPtr<ArgumentEncoder> arguments)
103 {
104     if (!isValid())
105         return false;
106
107     MutexLocker locker(m_outgoingMessagesLock);
108     m_outgoingMessages.append(OutgoingMessage(messageID, arguments));
109     
110     // FIXME: We should add a boolean flag so we don't call this when work has already been scheduled.
111     m_connectionQueue.scheduleWork(WorkItem::create(this, &Connection::sendOutgoingMessages));
112     return true;
113 }
114
115 bool Connection::sendSyncReply(PassOwnPtr<ArgumentEncoder> arguments)
116 {
117     return sendMessage(MessageID(CoreIPCMessage::SyncMessageReply), arguments);
118 }
119
120 PassOwnPtr<ArgumentDecoder> Connection::waitForMessage(MessageID messageID, uint64_t destinationID, double timeout)
121 {
122     // First, check if this message is already in the incoming messages queue.
123     {
124         MutexLocker locker(m_incomingMessagesLock);
125
126         for (size_t i = 0; i < m_incomingMessages.size(); ++i) {
127             const IncomingMessage& message = m_incomingMessages[i];
128
129             if (message.messageID() == messageID && message.arguments()->destinationID() == destinationID) {
130                 OwnPtr<ArgumentDecoder> arguments(message.arguments());
131                 
132                 // Erase the incoming message.
133                 m_incomingMessages.remove(i);
134                 return arguments.release();
135             }
136         }
137     }
138     
139     double absoluteTime = currentTime() + timeout;
140     
141     std::pair<unsigned, uint64_t> messageAndDestination(std::make_pair(messageID.toInt(), destinationID));
142     
143     {
144         MutexLocker locker(m_waitForMessageMutex);
145
146         // We don't support having multiple clients wait for the same message.
147         ASSERT(!m_waitForMessageMap.contains(messageAndDestination));
148     
149         // Insert our pending wait.
150         m_waitForMessageMap.set(messageAndDestination, 0);
151     }
152     
153     // Now wait for it to be set.
154     while (true) {
155         MutexLocker locker(m_waitForMessageMutex);
156
157         HashMap<std::pair<unsigned, uint64_t>, ArgumentDecoder*>::iterator it = m_waitForMessageMap.find(messageAndDestination);
158         if (it->second) {
159             OwnPtr<ArgumentDecoder> arguments(it->second);
160             m_waitForMessageMap.remove(it);
161             
162             return arguments.release();
163         }
164         
165         // Now we wait.
166         if (!m_waitForMessageCondition.timedWait(m_waitForMessageMutex, absoluteTime)) {
167             // We timed out, now remove the pending wait.
168             m_waitForMessageMap.remove(messageAndDestination);
169
170             break;
171         }
172     }
173     
174     return PassOwnPtr<ArgumentDecoder>();
175 }
176
177 PassOwnPtr<ArgumentDecoder> Connection::sendSyncMessage(MessageID messageID, uint64_t syncRequestID, PassOwnPtr<ArgumentEncoder> encoder, double timeout)
178 {
179     // We only allow sending sync messages from the client run loop.
180     ASSERT(RunLoop::current() == m_clientRunLoop);
181
182     if (!isValid())
183         return 0;
184     
185     // Push the pending sync reply information on our stack.
186     {
187         MutexLocker locker(m_syncReplyStateMutex);
188         if (!m_shouldWaitForSyncReplies)
189             return 0;
190
191         m_pendingSyncReplies.append(PendingSyncReply(syncRequestID));
192     }
193     
194     // First send the message.
195     sendMessage(messageID, encoder);
196     
197     // Then wait for a reply. Waiting for a reply could involve dispatching incoming sync messages, so
198     // keep an extra reference to the connection here in case it's invalidated.
199     RefPtr<Connection> protect(this);
200     OwnPtr<ArgumentDecoder> reply = waitForSyncReply(syncRequestID, timeout);
201
202     // Finally, pop the pending sync reply information.
203     {
204         MutexLocker locker(m_syncReplyStateMutex);
205         ASSERT(m_pendingSyncReplies.last().syncRequestID == syncRequestID);
206         m_pendingSyncReplies.removeLast();
207
208         if (m_pendingSyncReplies.isEmpty()) {
209             // This was the bottom-most sendSyncMessage call in the stack. If we have any pending incoming
210             // sync messages, they need to be dispatched.
211             if (!m_syncMessagesReceivedWhileWaitingForSyncReply.isEmpty()) {
212                 // Add the messages.
213                 MutexLocker locker(m_incomingMessagesLock);
214                 m_incomingMessages.append(m_syncMessagesReceivedWhileWaitingForSyncReply);
215                 m_syncMessagesReceivedWhileWaitingForSyncReply.clear();
216
217                 // Schedule for the messages to be sent.
218                 m_clientRunLoop->scheduleWork(WorkItem::create(this, &Connection::dispatchMessages));
219             }
220         }
221     }
222     
223     return reply.release();
224 }
225
226 PassOwnPtr<ArgumentDecoder> Connection::waitForSyncReply(uint64_t syncRequestID, double timeout)
227 {
228     double absoluteTime = currentTime() + timeout;
229
230     bool timedOut = false;
231     while (!timedOut) {
232         {
233             MutexLocker locker(m_syncReplyStateMutex);
234
235             // First, check if we have any incoming sync messages that we need to process.
236             Vector<IncomingMessage> syncMessagesReceivedWhileWaitingForSyncReply;
237             m_syncMessagesReceivedWhileWaitingForSyncReply.swap(syncMessagesReceivedWhileWaitingForSyncReply);
238
239             if (!syncMessagesReceivedWhileWaitingForSyncReply.isEmpty()) {
240                 // Make sure to unlock the mutex here because we're calling out to client code which could in turn send
241                 // another sync message and we don't want that to deadlock.
242                 m_syncReplyStateMutex.unlock();
243                 
244                 for (size_t i = 0; i < syncMessagesReceivedWhileWaitingForSyncReply.size(); ++i) {
245                     IncomingMessage& message = syncMessagesReceivedWhileWaitingForSyncReply[i];
246                     OwnPtr<ArgumentDecoder> arguments = message.releaseArguments();
247
248                     dispatchSyncMessage(message.messageID(), arguments.get());
249                 }
250                 m_syncReplyStateMutex.lock();
251             }
252
253             // Second, check if there is a sync reply at the top of the stack.
254             ASSERT(!m_pendingSyncReplies.isEmpty());
255             
256             PendingSyncReply& pendingSyncReply = m_pendingSyncReplies.last();
257             ASSERT(pendingSyncReply.syncRequestID == syncRequestID);
258             
259             // We found the sync reply, or the connection was closed.
260             if (pendingSyncReply.didReceiveReply || !m_shouldWaitForSyncReplies)
261                 return pendingSyncReply.releaseReplyDecoder();
262         }
263
264         // We didn't find a sync reply yet, keep waiting.
265         timedOut = !m_waitForSyncReplySemaphore.wait(absoluteTime);
266     }
267
268     // We timed out.
269     return 0;
270 }
271
272 void Connection::processIncomingMessage(MessageID messageID, PassOwnPtr<ArgumentDecoder> arguments)
273 {
274     // Check if this is a sync reply.
275     if (messageID == MessageID(CoreIPCMessage::SyncMessageReply)) {
276         MutexLocker locker(m_syncReplyStateMutex);
277         ASSERT(!m_pendingSyncReplies.isEmpty());
278
279         PendingSyncReply& pendingSyncReply = m_pendingSyncReplies.last();
280         ASSERT(pendingSyncReply.syncRequestID == arguments->destinationID());
281
282         pendingSyncReply.replyDecoder = arguments.leakPtr();
283         pendingSyncReply.didReceiveReply = true;
284
285         m_waitForSyncReplySemaphore.signal();
286         return;
287     }
288
289     // Check if this is a sync message. If it is, and we're waiting for a sync reply this message
290     // needs to be dispatched. If we don't we'll end up with a deadlock where both sync message senders are
291     // stuck waiting for a reply.
292     if (messageID.isSync()) {
293         MutexLocker locker(m_syncReplyStateMutex);
294         if (!m_pendingSyncReplies.isEmpty()) {
295             m_syncMessagesReceivedWhileWaitingForSyncReply.append(IncomingMessage(messageID, arguments));
296
297             // The message has been added, now wake up the client thread.
298             m_waitForSyncReplySemaphore.signal();
299             return;
300         }
301     }
302         
303     // Check if we're waiting for this message.
304     {
305         MutexLocker locker(m_waitForMessageMutex);
306         
307         HashMap<std::pair<unsigned, uint64_t>, ArgumentDecoder*>::iterator it = m_waitForMessageMap.find(std::make_pair(messageID.toInt(), arguments->destinationID()));
308         if (it != m_waitForMessageMap.end()) {
309             it->second = arguments.leakPtr();
310         
311             m_waitForMessageCondition.signal();
312             return;
313         }
314     }
315
316     MutexLocker locker(m_incomingMessagesLock);
317     m_incomingMessages.append(IncomingMessage(messageID, arguments));
318
319     m_clientRunLoop->scheduleWork(WorkItem::create(this, &Connection::dispatchMessages));
320 }
321
322 void Connection::connectionDidClose()
323 {
324     // The connection is now invalid.
325     platformInvalidate();
326
327     {
328         MutexLocker locker(m_syncReplyStateMutex);
329
330         ASSERT(m_shouldWaitForSyncReplies);
331         m_shouldWaitForSyncReplies = false;
332
333         if (!m_pendingSyncReplies.isEmpty())
334             m_waitForSyncReplySemaphore.signal();
335     }
336
337     m_client->didCloseOnConnectionWorkQueue(&m_connectionQueue, this);
338
339     m_clientRunLoop->scheduleWork(WorkItem::create(this, &Connection::dispatchConnectionDidClose));
340 }
341
342 void Connection::dispatchConnectionDidClose()
343 {
344     // If the connection has been explicitly invalidated before dispatchConnectionDidClose was called,
345     // then the client will be null here.
346     if (!m_client)
347         return;
348
349
350     // Because we define a connection as being "valid" based on wheter it has a null client, we null out
351     // the client before calling didClose here. Otherwise, sendSync will try to send a message to the connection and
352     // will then wait indefinitely for a reply.
353     Client* client = m_client;
354     m_client = 0;
355     
356     client->didClose(this);
357 }
358
359 bool Connection::canSendOutgoingMessages() const
360 {
361     return m_isConnected && platformCanSendOutgoingMessages();
362 }
363
364 void Connection::sendOutgoingMessages()
365 {
366     if (!canSendOutgoingMessages())
367         return;
368
369     while (true) {
370         OutgoingMessage message;
371         {
372             MutexLocker locker(m_outgoingMessagesLock);
373             if (m_outgoingMessages.isEmpty())
374                 break;
375             message = m_outgoingMessages.takeFirst();
376         }
377
378         if (!sendOutgoingMessage(message.messageID(), adoptPtr(message.arguments())))
379             break;
380     }
381 }
382
383 void Connection::dispatchSyncMessage(MessageID messageID, ArgumentDecoder* arguments)
384 {
385     ASSERT(messageID.isSync());
386
387     // Decode the sync request ID.
388     uint64_t syncRequestID = 0;
389
390     if (!arguments->decodeUInt64(syncRequestID) || !syncRequestID) {
391         // We received an invalid sync message.
392         arguments->markInvalid();
393         return;
394     }
395
396     // Create our reply encoder.
397     ArgumentEncoder* replyEncoder = ArgumentEncoder::create(syncRequestID).leakPtr();
398     
399     // Hand off both the decoder and encoder to the client..
400     SyncReplyMode syncReplyMode = m_client->didReceiveSyncMessage(this, messageID, arguments, replyEncoder);
401
402     // FIXME: If the message was invalid, we should send back a SyncMessageError.
403     ASSERT(!arguments->isInvalid());
404
405     if (syncReplyMode == ManualReply) {
406         // The client will take ownership of the reply encoder and send it at some point in the future.
407         // We won't do anything here.
408         return;
409     }
410
411     // Send the reply.
412     sendSyncReply(replyEncoder);
413 }
414
415 void Connection::dispatchMessages()
416 {
417     Vector<IncomingMessage> incomingMessages;
418     
419     {
420         MutexLocker locker(m_incomingMessagesLock);
421         m_incomingMessages.swap(incomingMessages);
422     }
423
424     // Dispatch messages.
425     for (size_t i = 0; i < incomingMessages.size(); ++i) {
426         // If someone calls invalidate while we're invalidating messages, we should stop.
427         if (!m_client)
428             return;
429         
430         IncomingMessage& message = incomingMessages[i];
431         OwnPtr<ArgumentDecoder> arguments = message.releaseArguments();
432
433         m_inDispatchMessageCount++;
434
435         bool oldDidReceiveInvalidMessage = m_didReceiveInvalidMessage;
436         m_didReceiveInvalidMessage = false;
437
438         if (message.messageID().isSync())
439             dispatchSyncMessage(message.messageID(), arguments.get());
440         else
441             m_client->didReceiveMessage(this, message.messageID(), arguments.get());
442
443         m_didReceiveInvalidMessage |= arguments->isInvalid();
444         m_inDispatchMessageCount--;
445
446         if (m_didReceiveInvalidMessage)
447             m_client->didReceiveInvalidMessage(this, message.messageID());
448
449         m_didReceiveInvalidMessage = oldDidReceiveInvalidMessage;
450     }
451 }
452
453 } // namespace CoreIPC