e1d009ec841256e3b8d9583350c2a61c3fafdc8d
[WebKit-https.git] / WebKit2 / Platform / CoreIPC / Connection.cpp
1 /*
2  * Copyright (C) 2010 Apple Inc. All rights reserved.
3  *
4  * Redistribution and use in source and binary forms, with or without
5  * modification, are permitted provided that the following conditions
6  * are met:
7  * 1. Redistributions of source code must retain the above copyright
8  *    notice, this list of conditions and the following disclaimer.
9  * 2. Redistributions in binary form must reproduce the above copyright
10  *    notice, this list of conditions and the following disclaimer in the
11  *    documentation and/or other materials provided with the distribution.
12  *
13  * THIS SOFTWARE IS PROVIDED BY APPLE INC. AND ITS CONTRIBUTORS ``AS IS''
14  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
15  * THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
16  * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL APPLE INC. OR ITS CONTRIBUTORS
17  * BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
18  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
19  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
20  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
21  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
22  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF
23  * THE POSSIBILITY OF SUCH DAMAGE.
24  */
25
26 #include "Connection.h"
27
28 #include "CoreIPCMessageKinds.h"
29 #include "RunLoop.h"
30 #include "WorkItem.h"
31 #include <wtf/CurrentTime.h>
32
33 using namespace std;
34
35 namespace CoreIPC {
36
37 PassRefPtr<Connection> Connection::createServerConnection(Identifier identifier, Client* client, RunLoop* clientRunLoop)
38 {
39     return adoptRef(new Connection(identifier, true, client, clientRunLoop));
40 }
41
42 PassRefPtr<Connection> Connection::createClientConnection(Identifier identifier, Client* client, RunLoop* clientRunLoop)
43 {
44     return adoptRef(new Connection(identifier, false, client, clientRunLoop));
45 }
46
47 Connection::Connection(Identifier identifier, bool isServer, Client* client, RunLoop* clientRunLoop)
48     : m_client(client)
49     , m_isServer(isServer)
50     , m_syncRequestID(0)
51     , m_isConnected(false)
52     , m_connectionQueue("com.apple.CoreIPC.ReceiveQueue")
53     , m_clientRunLoop(clientRunLoop)
54 {
55     ASSERT(m_client);
56
57     platformInitialize(identifier);
58 }
59
60 Connection::~Connection()
61 {
62     ASSERT(!isValid());
63
64     m_connectionQueue.invalidate();
65 }
66
67 void Connection::invalidate()
68 {
69     if (!isValid()) {
70         // Someone already called invalidate().
71         return;
72     }
73     
74     // Reset the client.
75     m_client = 0;
76
77     m_connectionQueue.scheduleWork(WorkItem::create(this, &Connection::platformInvalidate));
78 }
79
80 bool Connection::sendMessage(MessageID messageID, PassOwnPtr<ArgumentEncoder> arguments)
81 {
82     if (!isValid())
83         return false;
84
85     MutexLocker locker(m_outgoingMessagesLock);
86     m_outgoingMessages.append(OutgoingMessage(messageID, arguments));
87     
88     // FIXME: We should add a boolean flag so we don't call this when work has already been scheduled.
89     m_connectionQueue.scheduleWork(WorkItem::create(this, &Connection::sendOutgoingMessages));
90     return true;
91 }
92
93 PassOwnPtr<ArgumentDecoder> Connection::waitForMessage(MessageID messageID, uint64_t destinationID, double timeout)
94 {
95     // First, check if this message is already in the incoming messages queue.
96     {
97         MutexLocker locker(m_incomingMessagesLock);
98
99         for (size_t i = 0; i < m_incomingMessages.size(); ++i) {
100             const IncomingMessage& message = m_incomingMessages[i];
101
102             if (message.messageID() == messageID && message.arguments()->destinationID() == destinationID) {
103                 OwnPtr<ArgumentDecoder> arguments(message.arguments());
104                 
105                 // Erase the incoming message.
106                 m_incomingMessages.remove(i);
107                 return arguments.release();
108             }
109         }
110     }
111     
112     double absoluteTime = currentTime() + timeout;
113     
114     std::pair<unsigned, uint64_t> messageAndDestination(std::make_pair(messageID.toInt(), destinationID));
115     
116     {
117         MutexLocker locker(m_waitForMessageMutex);
118
119         // We don't support having multiple clients wait for the same message.
120         ASSERT(!m_waitForMessageMap.contains(messageAndDestination));
121     
122         // Insert our pending wait.
123         m_waitForMessageMap.set(messageAndDestination, 0);
124     }
125     
126     bool timedOut = false;
127     
128     // Now wait for it to be set.
129     while (!timedOut) {
130         MutexLocker locker(m_waitForMessageMutex);
131
132         HashMap<std::pair<unsigned, uint64_t>, ArgumentDecoder*>::iterator it = m_waitForMessageMap.find(messageAndDestination);
133         if (it->second) {
134             OwnPtr<ArgumentDecoder> arguments(it->second);
135             m_waitForMessageMap.remove(it);
136             
137             return arguments.release();
138         }
139         
140         // We didn't find it, keep waiting.
141         timedOut = !m_waitForMessageCondition.timedWait(m_waitForMessageMutex, absoluteTime);
142     }
143
144     // We timed out, now remove the pending wait.
145     {
146         MutexLocker locker(m_waitForMessageMutex);
147         m_waitForMessageMap.remove(messageAndDestination);
148     }
149     
150     return PassOwnPtr<ArgumentDecoder>();
151 }
152
153 PassOwnPtr<ArgumentDecoder> Connection::sendSyncMessage(MessageID messageID, uint64_t syncRequestID, PassOwnPtr<ArgumentEncoder> encoder, double timeout)
154 {
155     // First send the message.
156     if (!sendMessage(messageID, encoder))
157         return PassOwnPtr<ArgumentDecoder>();
158
159     // Now wait for a reply and return it.
160     return waitForMessage(MessageID(CoreIPCMessage::SyncMessageReply), syncRequestID, timeout);
161 }
162
163 void Connection::processIncomingMessage(MessageID messageID, PassOwnPtr<ArgumentDecoder> arguments)
164 {
165     // First, check if we're waiting for this message.
166     {
167         MutexLocker locker(m_waitForMessageMutex);
168         
169         HashMap<std::pair<unsigned, uint64_t>, ArgumentDecoder*>::iterator it = m_waitForMessageMap.find(std::make_pair(messageID.toInt(), arguments->destinationID()));
170         if (it != m_waitForMessageMap.end()) {
171             it->second = arguments.leakPtr();
172         
173             m_waitForMessageCondition.signal();
174             return;
175         }
176     }
177
178     if (messageID == MessageID(CoreIPCMessage::SyncMessageReply)) {
179         // FIXME: We got a reply for another sync message someone sent, handle this.
180         ASSERT_NOT_REACHED();
181     }
182
183     MutexLocker locker(m_incomingMessagesLock);
184     m_incomingMessages.append(IncomingMessage(messageID, arguments));
185
186     m_clientRunLoop->scheduleWork(WorkItem::create(this, &Connection::dispatchMessages));
187 }
188
189 void Connection::connectionDidClose()
190 {
191     // The connection is now invalid.
192     platformInvalidate();
193     
194     m_clientRunLoop->scheduleWork(WorkItem::create(this, &Connection::dispatchConnectionDidClose));
195 }
196
197 void Connection::dispatchConnectionDidClose()
198 {
199     // If the connection has been explicitly invalidated before dispatchConnectionDidClose was called,
200     // then the client will be null here.
201     if (!m_client)
202         return;
203
204     m_client->didClose(this);
205     
206     // Reset the client.
207     m_client = 0;
208 }
209
210 bool Connection::canSendOutgoingMessages() const
211 {
212     return m_isConnected && platformCanSendOutgoingMessages();
213 }
214
215 void Connection::sendOutgoingMessages()
216 {
217     if (!canSendOutgoingMessages())
218         return;
219
220     while (true) {
221         OutgoingMessage message;
222         {
223             MutexLocker locker(m_outgoingMessagesLock);
224             if (m_outgoingMessages.isEmpty())
225                 break;
226             message = m_outgoingMessages.takeFirst();
227         }
228
229         if (!sendOutgoingMessage(message.messageID(), adoptPtr(message.arguments())))
230             break;
231     }
232 }
233
234 void Connection::dispatchMessages()
235 {
236     Vector<IncomingMessage> incomingMessages;
237     
238     {
239         MutexLocker locker(m_incomingMessagesLock);
240         m_incomingMessages.swap(incomingMessages);
241     }
242
243     // Dispatch messages.
244     for (size_t i = 0; i < incomingMessages.size(); ++i) {
245         IncomingMessage& message = incomingMessages[i];
246         ArgumentDecoder* arguments = message.arguments();
247
248         if (message.messageID().isSync()) {
249             // Decode the sync request ID.
250             uint64_t syncRequestID = 0;
251
252             if (!arguments->decodeUInt64(syncRequestID)) {
253                 // FIXME: Handle this case.
254                 ASSERT_NOT_REACHED();
255             }
256
257             // Create our reply encoder.
258             OwnPtr<ArgumentEncoder> replyEncoder(new ArgumentEncoder(syncRequestID));
259             
260             // Hand off both the decoder and encoder to the client..
261             m_client->didReceiveSyncMessage(this, message.messageID(), arguments, replyEncoder.get());
262             
263             // Send the reply.
264             sendMessage(MessageID(CoreIPCMessage::SyncMessageReply), replyEncoder.release());
265         } else
266             m_client->didReceiveMessage(this, message.messageID(), arguments);
267
268         message.destroy();
269     }
270 }
271
272 } // namespace CoreIPC