2010-01-13 Fumitoshi Ukai <ukai@chromium.org>
[WebKit-https.git] / WebCore / websockets / WorkerThreadableWebSocketChannel.cpp
1 /*
2  * Copyright (C) 2009 Google Inc.  All rights reserved.
3  *
4  * Redistribution and use in source and binary forms, with or without
5  * modification, are permitted provided that the following conditions are
6  * met:
7  *
8  *     * Redistributions of source code must retain the above copyright
9  * notice, this list of conditions and the following disclaimer.
10  *     * Redistributions in binary form must reproduce the above
11  * copyright notice, this list of conditions and the following disclaimer
12  * in the documentation and/or other materials provided with the
13  * distribution.
14  *     * Neither the name of Google Inc. nor the names of its
15  * contributors may be used to endorse or promote products derived from
16  * this software without specific prior written permission.
17  *
18  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
19  * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
20  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
21  * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
22  * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
23  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
24  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
25  * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
26  * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
27  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
28  * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
29  */
30
31 #include "config.h"
32
33 #if ENABLE(WEB_SOCKETS) && ENABLE(WORKERS)
34
35 #include "WorkerThreadableWebSocketChannel.h"
36
37 #include "GenericWorkerTask.h"
38 #include "PlatformString.h"
39 #include "ScriptExecutionContext.h"
40 #include "ThreadableWebSocketChannelClientWrapper.h"
41 #include "WebSocketChannel.h"
42 #include "WebSocketChannelClient.h"
43 #include "WorkerContext.h"
44 #include "WorkerLoaderProxy.h"
45 #include "WorkerRunLoop.h"
46 #include "WorkerThread.h"
47
48 #include <wtf/PassRefPtr.h>
49
50 namespace WebCore {
51
52 WorkerThreadableWebSocketChannel::WorkerThreadableWebSocketChannel(WorkerContext* context, WebSocketChannelClient* client, const String& taskMode, const KURL& url, const String& protocol)
53     : m_workerContext(context)
54     , m_workerClientWrapper(ThreadableWebSocketChannelClientWrapper::create(client))
55     , m_bridge(new Bridge(m_workerClientWrapper, m_workerContext, taskMode, url, protocol))
56 {
57 }
58
59 WorkerThreadableWebSocketChannel::~WorkerThreadableWebSocketChannel()
60 {
61     if (m_bridge)
62         m_bridge->disconnect();
63 }
64
65 void WorkerThreadableWebSocketChannel::connect()
66 {
67     if (m_bridge)
68         m_bridge->connect();
69 }
70
71 bool WorkerThreadableWebSocketChannel::send(const String& message)
72 {
73     if (!m_bridge)
74         return false;
75     return m_bridge->send(message);
76 }
77
78 unsigned long WorkerThreadableWebSocketChannel::bufferedAmount() const
79 {
80     if (!m_bridge)
81         return 0;
82     return m_bridge->bufferedAmount();
83 }
84
85 void WorkerThreadableWebSocketChannel::close()
86 {
87     if (!m_bridge)
88         m_bridge->close();
89 }
90
91 void WorkerThreadableWebSocketChannel::disconnect()
92 {
93     m_bridge->disconnect();
94     m_bridge.clear();
95 }
96
97 WorkerThreadableWebSocketChannel::Peer::Peer(RefPtr<ThreadSafeShared<ThreadableWebSocketChannelClientWrapper> > clientWrapper, WorkerLoaderProxy& loaderProxy, ScriptExecutionContext* context, const String& taskMode, const KURL& url, const String& protocol)
98     : m_workerClientWrapper(clientWrapper)
99     , m_loaderProxy(loaderProxy)
100     , m_mainWebSocketChannel(WebSocketChannel::create(context, this, url, protocol))
101     , m_taskMode(taskMode)
102 {
103     ASSERT(isMainThread());
104 }
105
106 WorkerThreadableWebSocketChannel::Peer::~Peer()
107 {
108     ASSERT(isMainThread());
109     if (m_mainWebSocketChannel)
110         m_mainWebSocketChannel->disconnect();
111 }
112
113 void WorkerThreadableWebSocketChannel::Peer::connect()
114 {
115     ASSERT(isMainThread());
116     if (!m_mainWebSocketChannel)
117         return;
118     m_mainWebSocketChannel->connect();
119 }
120
121 static void workerContextDidSend(ScriptExecutionContext* context, RefPtr<ThreadSafeShared<ThreadableWebSocketChannelClientWrapper> > workerClientWrapper, bool sent)
122 {
123     ASSERT_UNUSED(context, context->isWorkerContext());
124     static_cast<ThreadableWebSocketChannelClientWrapper*>(workerClientWrapper.get())->setSent(sent);
125 }
126
127 void WorkerThreadableWebSocketChannel::Peer::send(const String& message)
128 {
129     ASSERT(isMainThread());
130     if (!m_mainWebSocketChannel || !m_workerClientWrapper)
131         return;
132     bool sent = m_mainWebSocketChannel->send(message);
133     m_loaderProxy.postTaskForModeToWorkerContext(createCallbackTask(&workerContextDidSend, m_workerClientWrapper, sent), m_taskMode);
134 }
135
136 static void workerContextDidGetBufferedAmount(ScriptExecutionContext* context, RefPtr<ThreadSafeShared<ThreadableWebSocketChannelClientWrapper> > workerClientWrapper, unsigned long bufferedAmount)
137 {
138     ASSERT_UNUSED(context, context->isWorkerContext());
139     static_cast<ThreadableWebSocketChannelClientWrapper*>(workerClientWrapper.get())->setBufferedAmount(bufferedAmount);
140 }
141
142 void WorkerThreadableWebSocketChannel::Peer::bufferedAmount()
143 {
144     ASSERT(isMainThread());
145     if (!m_mainWebSocketChannel || !m_workerClientWrapper)
146         return;
147     unsigned long bufferedAmount = m_mainWebSocketChannel->bufferedAmount();
148     m_loaderProxy.postTaskForModeToWorkerContext(createCallbackTask(&workerContextDidGetBufferedAmount, m_workerClientWrapper, bufferedAmount), m_taskMode);
149 }
150
151 void WorkerThreadableWebSocketChannel::Peer::close()
152 {
153     ASSERT(isMainThread());
154     if (!m_mainWebSocketChannel)
155         return;
156     m_mainWebSocketChannel->close();
157     m_mainWebSocketChannel = 0;
158 }
159
160 void WorkerThreadableWebSocketChannel::Peer::disconnect()
161 {
162     ASSERT(isMainThread());
163     if (!m_mainWebSocketChannel)
164         return;
165     m_mainWebSocketChannel->disconnect();
166     m_mainWebSocketChannel = 0;
167 }
168
169 static void workerContextDidConnect(ScriptExecutionContext* context, RefPtr<ThreadSafeShared<ThreadableWebSocketChannelClientWrapper> > workerClientWrapper)
170 {
171     ASSERT_UNUSED(context, context->isWorkerContext());
172     static_cast<ThreadableWebSocketChannelClientWrapper*>(workerClientWrapper.get())->didConnect();
173 }
174
175 void WorkerThreadableWebSocketChannel::Peer::didConnect()
176 {
177     ASSERT(isMainThread());
178     m_loaderProxy.postTaskForModeToWorkerContext(createCallbackTask(&workerContextDidConnect, m_workerClientWrapper), m_taskMode);
179 }
180
181 static void workerContextDidReceiveMessage(ScriptExecutionContext* context, RefPtr<ThreadSafeShared<ThreadableWebSocketChannelClientWrapper> > workerClientWrapper, const String& message)
182 {
183     ASSERT_UNUSED(context, context->isWorkerContext());
184     static_cast<ThreadableWebSocketChannelClientWrapper*>(workerClientWrapper.get())->didReceiveMessage(message);
185 }
186
187 void WorkerThreadableWebSocketChannel::Peer::didReceiveMessage(const String& message)
188 {
189     ASSERT(isMainThread());
190     m_loaderProxy.postTaskForModeToWorkerContext(createCallbackTask(&workerContextDidReceiveMessage, m_workerClientWrapper, message), m_taskMode);
191 }
192
193 static void workerContextDidClose(ScriptExecutionContext* context, RefPtr<ThreadSafeShared<ThreadableWebSocketChannelClientWrapper> > workerClientWrapper)
194 {
195     ASSERT_UNUSED(context, context->isWorkerContext());
196     static_cast<ThreadableWebSocketChannelClientWrapper*>(workerClientWrapper.get())->didClose();
197 }
198
199 void WorkerThreadableWebSocketChannel::Peer::didClose()
200 {
201     ASSERT(isMainThread());
202     m_mainWebSocketChannel = 0;
203     m_loaderProxy.postTaskForModeToWorkerContext(createCallbackTask(&workerContextDidClose, m_workerClientWrapper), m_taskMode);
204 }
205
206 void WorkerThreadableWebSocketChannel::Bridge::setWebSocketChannel(ScriptExecutionContext* context, Bridge* thisPtr, Peer* peer, RefPtr<ThreadSafeShared<ThreadableWebSocketChannelClientWrapper> > workerClientWrapper)
207 {
208     ASSERT_UNUSED(context, context->isWorkerContext());
209     thisPtr->m_peer = peer;
210     static_cast<ThreadableWebSocketChannelClientWrapper*>(workerClientWrapper.get())->setSyncMethodDone();
211 }
212
213 void WorkerThreadableWebSocketChannel::Bridge::mainThreadCreateWebSocketChannel(ScriptExecutionContext* context, Bridge* thisPtr, RefPtr<ThreadSafeShared<ThreadableWebSocketChannelClientWrapper> > clientWrapper, const String& taskMode, const KURL& url, const String& protocol)
214 {
215     ASSERT(isMainThread());
216     ASSERT_UNUSED(context, context->isDocument());
217
218     Peer* peer = Peer::create(clientWrapper, thisPtr->m_loaderProxy, context, taskMode, url, protocol);
219     thisPtr->m_loaderProxy.postTaskForModeToWorkerContext(createCallbackTask(&Bridge::setWebSocketChannel, thisPtr, peer, clientWrapper), taskMode);
220 }
221
222 WorkerThreadableWebSocketChannel::Bridge::Bridge(PassRefPtr<ThreadableWebSocketChannelClientWrapper> workerClientWrapper, PassRefPtr<WorkerContext> workerContext, const String& taskMode, const KURL& url, const String& protocol)
223     : m_workerClientWrapper(workerClientWrapper)
224     , m_workerContext(workerContext)
225     , m_loaderProxy(m_workerContext->thread()->workerLoaderProxy())
226     , m_taskMode(taskMode)
227     , m_peer(0)
228 {
229     ASSERT(m_workerClientWrapper.get());
230     setMethodNotCompleted();
231     m_loaderProxy.postTaskToLoader(createCallbackTask(&Bridge::mainThreadCreateWebSocketChannel, this, m_workerClientWrapper, m_taskMode, url, protocol));
232     waitForMethodCompletion();
233     ASSERT(m_peer);
234 }
235
236 WorkerThreadableWebSocketChannel::Bridge::~Bridge()
237 {
238     disconnect();
239 }
240
241 void WorkerThreadableWebSocketChannel::mainThreadConnect(ScriptExecutionContext* context, Peer* peer)
242 {
243     ASSERT(isMainThread());
244     ASSERT_UNUSED(context, context->isDocument());
245     ASSERT(peer);
246
247     peer->connect();
248 }
249
250 void WorkerThreadableWebSocketChannel::Bridge::connect()
251 {
252     ASSERT(m_workerClientWrapper);
253     ASSERT(m_peer);
254     m_loaderProxy.postTaskToLoader(createCallbackTask(&WorkerThreadableWebSocketChannel::mainThreadConnect, m_peer));
255 }
256
257 void WorkerThreadableWebSocketChannel::mainThreadSend(ScriptExecutionContext* context, Peer* peer, const String& message)
258 {
259     ASSERT(isMainThread());
260     ASSERT_UNUSED(context, context->isDocument());
261     ASSERT(peer);
262
263     peer->send(message);
264 }
265
266 bool WorkerThreadableWebSocketChannel::Bridge::send(const String& message)
267 {
268     if (!m_workerClientWrapper)
269         return false;
270     ASSERT(m_peer);
271     setMethodNotCompleted();
272     m_loaderProxy.postTaskToLoader(createCallbackTask(&WorkerThreadableWebSocketChannel::mainThreadSend, m_peer, message));
273     waitForMethodCompletion();
274     ThreadableWebSocketChannelClientWrapper* clientWrapper = static_cast<ThreadableWebSocketChannelClientWrapper*>(m_workerClientWrapper.get());
275     return clientWrapper && clientWrapper->sent();
276 }
277
278 void WorkerThreadableWebSocketChannel::mainThreadBufferedAmount(ScriptExecutionContext* context, Peer* peer)
279 {
280     ASSERT(isMainThread());
281     ASSERT_UNUSED(context, context->isDocument());
282     ASSERT(peer);
283
284     peer->bufferedAmount();
285 }
286
287 unsigned long WorkerThreadableWebSocketChannel::Bridge::bufferedAmount()
288 {
289     if (!m_workerClientWrapper)
290         return 0;
291     ASSERT(m_peer);
292     setMethodNotCompleted();
293     m_loaderProxy.postTaskToLoader(createCallbackTask(&WorkerThreadableWebSocketChannel::mainThreadBufferedAmount, m_peer));
294     waitForMethodCompletion();
295     ThreadableWebSocketChannelClientWrapper* clientWrapper = static_cast<ThreadableWebSocketChannelClientWrapper*>(m_workerClientWrapper.get());
296     if (clientWrapper)
297         return clientWrapper->bufferedAmount();
298     return 0;
299 }
300
301 void WorkerThreadableWebSocketChannel::mainThreadClose(ScriptExecutionContext* context, Peer* peer)
302 {
303     ASSERT(isMainThread());
304     ASSERT_UNUSED(context, context->isDocument());
305     ASSERT(peer);
306
307     peer->close();
308 }
309
310 void WorkerThreadableWebSocketChannel::Bridge::close()
311 {
312     ASSERT(m_peer);
313     m_loaderProxy.postTaskToLoader(createCallbackTask(&WorkerThreadableWebSocketChannel::mainThreadClose, m_peer));
314 }
315
316 void WorkerThreadableWebSocketChannel::mainThreadDestroy(ScriptExecutionContext* context, Peer* peer)
317 {
318     ASSERT(isMainThread());
319     ASSERT_UNUSED(context, context->isDocument());
320     ASSERT(peer);
321
322     delete peer;
323 }
324
325 void WorkerThreadableWebSocketChannel::Bridge::disconnect()
326 {
327     clearClientWrapper();
328     if (m_peer) {
329         Peer* peer = m_peer;
330         m_peer = 0;
331         m_loaderProxy.postTaskToLoader(createCallbackTask(&mainThreadDestroy, peer));
332     }
333     m_workerContext = 0;
334 }
335
336 void WorkerThreadableWebSocketChannel::Bridge::clearClientWrapper()
337 {
338     static_cast<ThreadableWebSocketChannelClientWrapper*>(m_workerClientWrapper.get())->clearClient();
339 }
340
341 void WorkerThreadableWebSocketChannel::Bridge::setMethodNotCompleted()
342 {
343     ASSERT(m_workerClientWrapper);
344     static_cast<ThreadableWebSocketChannelClientWrapper*>(m_workerClientWrapper.get())->clearSyncMethodDone();
345 }
346
347 void WorkerThreadableWebSocketChannel::Bridge::waitForMethodCompletion()
348 {
349     if (!m_workerContext)
350         return;
351     WorkerRunLoop& runLoop = m_workerContext->thread()->runLoop();
352     MessageQueueWaitResult result = MessageQueueMessageReceived;
353     ThreadableWebSocketChannelClientWrapper* clientWrapper = static_cast<ThreadableWebSocketChannelClientWrapper*>(m_workerClientWrapper.get());
354     while (clientWrapper && !clientWrapper->syncMethodDone() && result != MessageQueueTerminated) {
355         result = runLoop.runInMode(m_workerContext.get(), m_taskMode);
356         clientWrapper = static_cast<ThreadableWebSocketChannelClientWrapper*>(m_workerClientWrapper.get());
357     }
358 }
359
360 } // namespace WebCore
361
362 #endif // ENABLE(WEB_SOCKETS)