2011-05-24 Yuta Kitamura <yutak@chromium.org>
[WebKit-https.git] / Source / WebCore / websockets / WorkerThreadableWebSocketChannel.cpp
1 /*
2  * Copyright (C) 2009, 2010 Google Inc.  All rights reserved.
3  *
4  * Redistribution and use in source and binary forms, with or without
5  * modification, are permitted provided that the following conditions are
6  * met:
7  *
8  *     * Redistributions of source code must retain the above copyright
9  * notice, this list of conditions and the following disclaimer.
10  *     * Redistributions in binary form must reproduce the above
11  * copyright notice, this list of conditions and the following disclaimer
12  * in the documentation and/or other materials provided with the
13  * distribution.
14  *     * Neither the name of Google Inc. nor the names of its
15  * contributors may be used to endorse or promote products derived from
16  * this software without specific prior written permission.
17  *
18  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
19  * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
20  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
21  * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
22  * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
23  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
24  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
25  * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
26  * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
27  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
28  * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
29  */
30
31 #include "config.h"
32
33 #if ENABLE(WEB_SOCKETS) && ENABLE(WORKERS)
34
35 #include "WorkerThreadableWebSocketChannel.h"
36
37 #include "CrossThreadTask.h"
38 #include "PlatformString.h"
39 #include "ScriptExecutionContext.h"
40 #include "ThreadableWebSocketChannelClientWrapper.h"
41 #include "WebSocketChannel.h"
42 #include "WebSocketChannelClient.h"
43 #include "WorkerContext.h"
44 #include "WorkerLoaderProxy.h"
45 #include "WorkerRunLoop.h"
46 #include "WorkerThread.h"
47
48 #include <wtf/PassRefPtr.h>
49
50 namespace WebCore {
51
52 WorkerThreadableWebSocketChannel::WorkerThreadableWebSocketChannel(WorkerContext* context, WebSocketChannelClient* client, const String& taskMode, const KURL& url, const String& protocol)
53     : m_workerContext(context)
54     , m_workerClientWrapper(ThreadableWebSocketChannelClientWrapper::create(client))
55     , m_bridge(Bridge::create(m_workerClientWrapper, m_workerContext, taskMode, url, protocol))
56 {
57 }
58
59 WorkerThreadableWebSocketChannel::~WorkerThreadableWebSocketChannel()
60 {
61     if (m_bridge)
62         m_bridge->disconnect();
63 }
64
65 void WorkerThreadableWebSocketChannel::connect()
66 {
67     if (m_bridge)
68         m_bridge->connect();
69 }
70
71 bool WorkerThreadableWebSocketChannel::send(const String& message)
72 {
73     if (!m_bridge)
74         return false;
75     return m_bridge->send(message);
76 }
77
78 unsigned long WorkerThreadableWebSocketChannel::bufferedAmount() const
79 {
80     if (!m_bridge)
81         return 0;
82     return m_bridge->bufferedAmount();
83 }
84
85 void WorkerThreadableWebSocketChannel::close()
86 {
87     if (m_bridge)
88         m_bridge->close();
89 }
90
91 void WorkerThreadableWebSocketChannel::fail(const String& reason)
92 {
93     if (m_bridge)
94         m_bridge->fail(reason);
95 }
96
97 void WorkerThreadableWebSocketChannel::disconnect()
98 {
99     m_bridge->disconnect();
100     m_bridge.clear();
101 }
102
103 void WorkerThreadableWebSocketChannel::suspend()
104 {
105     m_workerClientWrapper->suspend();
106     if (m_bridge)
107         m_bridge->suspend();
108 }
109
110 void WorkerThreadableWebSocketChannel::resume()
111 {
112     m_workerClientWrapper->resume();
113     if (m_bridge)
114         m_bridge->resume();
115 }
116
117 WorkerThreadableWebSocketChannel::Peer::Peer(RefPtr<ThreadableWebSocketChannelClientWrapper> clientWrapper, WorkerLoaderProxy& loaderProxy, ScriptExecutionContext* context, const String& taskMode, const KURL& url, const String& protocol)
118     : m_workerClientWrapper(clientWrapper)
119     , m_loaderProxy(loaderProxy)
120     , m_mainWebSocketChannel(WebSocketChannel::create(context, this, url, protocol))
121     , m_taskMode(taskMode)
122 {
123     ASSERT(isMainThread());
124 }
125
126 WorkerThreadableWebSocketChannel::Peer::~Peer()
127 {
128     ASSERT(isMainThread());
129     if (m_mainWebSocketChannel)
130         m_mainWebSocketChannel->disconnect();
131 }
132
133 void WorkerThreadableWebSocketChannel::Peer::connect()
134 {
135     ASSERT(isMainThread());
136     if (!m_mainWebSocketChannel)
137         return;
138     m_mainWebSocketChannel->connect();
139 }
140
141 static void workerContextDidSend(ScriptExecutionContext* context, RefPtr<ThreadableWebSocketChannelClientWrapper> workerClientWrapper, bool sent)
142 {
143     ASSERT_UNUSED(context, context->isWorkerContext());
144     workerClientWrapper->setSent(sent);
145 }
146
147 void WorkerThreadableWebSocketChannel::Peer::send(const String& message)
148 {
149     ASSERT(isMainThread());
150     if (!m_mainWebSocketChannel || !m_workerClientWrapper)
151         return;
152     bool sent = m_mainWebSocketChannel->send(message);
153     m_loaderProxy.postTaskForModeToWorkerContext(createCallbackTask(&workerContextDidSend, m_workerClientWrapper, sent), m_taskMode);
154 }
155
156 static void workerContextDidGetBufferedAmount(ScriptExecutionContext* context, RefPtr<ThreadableWebSocketChannelClientWrapper> workerClientWrapper, unsigned long bufferedAmount)
157 {
158     ASSERT_UNUSED(context, context->isWorkerContext());
159     workerClientWrapper->setBufferedAmount(bufferedAmount);
160 }
161
162 void WorkerThreadableWebSocketChannel::Peer::bufferedAmount()
163 {
164     ASSERT(isMainThread());
165     if (!m_mainWebSocketChannel || !m_workerClientWrapper)
166         return;
167     unsigned long bufferedAmount = m_mainWebSocketChannel->bufferedAmount();
168     m_loaderProxy.postTaskForModeToWorkerContext(createCallbackTask(&workerContextDidGetBufferedAmount, m_workerClientWrapper, bufferedAmount), m_taskMode);
169 }
170
171 void WorkerThreadableWebSocketChannel::Peer::close()
172 {
173     ASSERT(isMainThread());
174     if (!m_mainWebSocketChannel)
175         return;
176     m_mainWebSocketChannel->close();
177     m_mainWebSocketChannel = 0;
178 }
179
180 void WorkerThreadableWebSocketChannel::Peer::fail(const String& reason)
181 {
182     ASSERT(isMainThread());
183     if (!m_mainWebSocketChannel)
184         return;
185     m_mainWebSocketChannel->fail(reason);
186 }
187
188 void WorkerThreadableWebSocketChannel::Peer::disconnect()
189 {
190     ASSERT(isMainThread());
191     if (!m_mainWebSocketChannel)
192         return;
193     m_mainWebSocketChannel->disconnect();
194     m_mainWebSocketChannel = 0;
195 }
196
197 void WorkerThreadableWebSocketChannel::Peer::suspend()
198 {
199     ASSERT(isMainThread());
200     if (!m_mainWebSocketChannel)
201         return;
202     m_mainWebSocketChannel->suspend();
203 }
204
205 void WorkerThreadableWebSocketChannel::Peer::resume()
206 {
207     ASSERT(isMainThread());
208     if (!m_mainWebSocketChannel)
209         return;
210     m_mainWebSocketChannel->resume();
211 }
212
213 static void workerContextDidConnect(ScriptExecutionContext* context, RefPtr<ThreadableWebSocketChannelClientWrapper> workerClientWrapper)
214 {
215     ASSERT_UNUSED(context, context->isWorkerContext());
216     workerClientWrapper->didConnect();
217 }
218
219 void WorkerThreadableWebSocketChannel::Peer::didConnect()
220 {
221     ASSERT(isMainThread());
222     m_loaderProxy.postTaskForModeToWorkerContext(createCallbackTask(&workerContextDidConnect, m_workerClientWrapper), m_taskMode);
223 }
224
225 static void workerContextDidReceiveMessage(ScriptExecutionContext* context, RefPtr<ThreadableWebSocketChannelClientWrapper> workerClientWrapper, const String& message)
226 {
227     ASSERT_UNUSED(context, context->isWorkerContext());
228     workerClientWrapper->didReceiveMessage(message);
229 }
230
231 void WorkerThreadableWebSocketChannel::Peer::didReceiveMessage(const String& message)
232 {
233     ASSERT(isMainThread());
234     m_loaderProxy.postTaskForModeToWorkerContext(createCallbackTask(&workerContextDidReceiveMessage, m_workerClientWrapper, message), m_taskMode);
235 }
236
237 static void workerContextDidClose(ScriptExecutionContext* context, RefPtr<ThreadableWebSocketChannelClientWrapper> workerClientWrapper, unsigned long unhandledBufferedAmount)
238 {
239     ASSERT_UNUSED(context, context->isWorkerContext());
240     workerClientWrapper->didClose(unhandledBufferedAmount);
241 }
242
243 void WorkerThreadableWebSocketChannel::Peer::didClose(unsigned long unhandledBufferedAmount)
244 {
245     ASSERT(isMainThread());
246     m_mainWebSocketChannel = 0;
247     m_loaderProxy.postTaskForModeToWorkerContext(createCallbackTask(&workerContextDidClose, m_workerClientWrapper, unhandledBufferedAmount), m_taskMode);
248 }
249
250 void WorkerThreadableWebSocketChannel::Bridge::setWebSocketChannel(ScriptExecutionContext* context, Bridge* thisPtr, Peer* peer, RefPtr<ThreadableWebSocketChannelClientWrapper> workerClientWrapper)
251 {
252     ASSERT_UNUSED(context, context->isWorkerContext());
253     thisPtr->m_peer = peer;
254     workerClientWrapper->setSyncMethodDone();
255 }
256
257 void WorkerThreadableWebSocketChannel::Bridge::mainThreadCreateWebSocketChannel(ScriptExecutionContext* context, Bridge* thisPtr, RefPtr<ThreadableWebSocketChannelClientWrapper> clientWrapper, const String& taskMode, const KURL& url, const String& protocol)
258 {
259     ASSERT(isMainThread());
260     ASSERT_UNUSED(context, context->isDocument());
261
262     Peer* peer = Peer::create(clientWrapper, thisPtr->m_loaderProxy, context, taskMode, url, protocol);
263     thisPtr->m_loaderProxy.postTaskForModeToWorkerContext(
264         createCallbackTask(&Bridge::setWebSocketChannel,
265                            AllowCrossThreadAccess(thisPtr),
266                            AllowCrossThreadAccess(peer), clientWrapper), taskMode);
267 }
268
269 WorkerThreadableWebSocketChannel::Bridge::Bridge(PassRefPtr<ThreadableWebSocketChannelClientWrapper> workerClientWrapper, PassRefPtr<WorkerContext> workerContext, const String& taskMode, const KURL& url, const String& protocol)
270     : m_workerClientWrapper(workerClientWrapper)
271     , m_workerContext(workerContext)
272     , m_loaderProxy(m_workerContext->thread()->workerLoaderProxy())
273     , m_taskMode(taskMode)
274     , m_peer(0)
275 {
276     ASSERT(m_workerClientWrapper.get());
277     setMethodNotCompleted();
278     m_loaderProxy.postTaskToLoader(
279         createCallbackTask(&Bridge::mainThreadCreateWebSocketChannel,
280                            AllowCrossThreadAccess(this), m_workerClientWrapper, m_taskMode, url, protocol));
281     waitForMethodCompletion();
282     ASSERT(m_peer);
283 }
284
285 WorkerThreadableWebSocketChannel::Bridge::~Bridge()
286 {
287     disconnect();
288 }
289
290 void WorkerThreadableWebSocketChannel::mainThreadConnect(ScriptExecutionContext* context, Peer* peer)
291 {
292     ASSERT(isMainThread());
293     ASSERT_UNUSED(context, context->isDocument());
294     ASSERT(peer);
295
296     peer->connect();
297 }
298
299 void WorkerThreadableWebSocketChannel::Bridge::connect()
300 {
301     ASSERT(m_workerClientWrapper);
302     ASSERT(m_peer);
303     m_loaderProxy.postTaskToLoader(createCallbackTask(&WorkerThreadableWebSocketChannel::mainThreadConnect, AllowCrossThreadAccess(m_peer)));
304 }
305
306 void WorkerThreadableWebSocketChannel::mainThreadSend(ScriptExecutionContext* context, Peer* peer, const String& message)
307 {
308     ASSERT(isMainThread());
309     ASSERT_UNUSED(context, context->isDocument());
310     ASSERT(peer);
311
312     peer->send(message);
313 }
314
315 bool WorkerThreadableWebSocketChannel::Bridge::send(const String& message)
316 {
317     if (!m_workerClientWrapper)
318         return false;
319     ASSERT(m_peer);
320     setMethodNotCompleted();
321     m_loaderProxy.postTaskToLoader(createCallbackTask(&WorkerThreadableWebSocketChannel::mainThreadSend, AllowCrossThreadAccess(m_peer), message));
322     RefPtr<Bridge> protect(this);
323     waitForMethodCompletion();
324     ThreadableWebSocketChannelClientWrapper* clientWrapper = m_workerClientWrapper.get();
325     return clientWrapper && clientWrapper->sent();
326 }
327
328 void WorkerThreadableWebSocketChannel::mainThreadBufferedAmount(ScriptExecutionContext* context, Peer* peer)
329 {
330     ASSERT(isMainThread());
331     ASSERT_UNUSED(context, context->isDocument());
332     ASSERT(peer);
333
334     peer->bufferedAmount();
335 }
336
337 unsigned long WorkerThreadableWebSocketChannel::Bridge::bufferedAmount()
338 {
339     if (!m_workerClientWrapper)
340         return 0;
341     ASSERT(m_peer);
342     setMethodNotCompleted();
343     m_loaderProxy.postTaskToLoader(createCallbackTask(&WorkerThreadableWebSocketChannel::mainThreadBufferedAmount, AllowCrossThreadAccess(m_peer)));
344     RefPtr<Bridge> protect(this);
345     waitForMethodCompletion();
346     ThreadableWebSocketChannelClientWrapper* clientWrapper = m_workerClientWrapper.get();
347     if (clientWrapper)
348         return clientWrapper->bufferedAmount();
349     return 0;
350 }
351
352 void WorkerThreadableWebSocketChannel::mainThreadClose(ScriptExecutionContext* context, Peer* peer)
353 {
354     ASSERT(isMainThread());
355     ASSERT_UNUSED(context, context->isDocument());
356     ASSERT(peer);
357
358     peer->close();
359 }
360
361 void WorkerThreadableWebSocketChannel::Bridge::close()
362 {
363     ASSERT(m_peer);
364     m_loaderProxy.postTaskToLoader(createCallbackTask(&WorkerThreadableWebSocketChannel::mainThreadClose, AllowCrossThreadAccess(m_peer)));
365 }
366
367 void WorkerThreadableWebSocketChannel::mainThreadFail(ScriptExecutionContext* context, Peer* peer, const String& reason)
368 {
369     ASSERT(isMainThread());
370     ASSERT_UNUSED(context, context->isDocument());
371     ASSERT(peer);
372
373     peer->fail(reason);
374 }
375
376 void WorkerThreadableWebSocketChannel::Bridge::fail(const String& reason)
377 {
378     ASSERT(m_peer);
379     m_loaderProxy.postTaskToLoader(createCallbackTask(&WorkerThreadableWebSocketChannel::mainThreadFail, AllowCrossThreadAccess(m_peer), reason));
380 }
381
382 void WorkerThreadableWebSocketChannel::mainThreadDestroy(ScriptExecutionContext* context, Peer* peer)
383 {
384     ASSERT(isMainThread());
385     ASSERT_UNUSED(context, context->isDocument());
386     ASSERT(peer);
387
388     delete peer;
389 }
390
391 void WorkerThreadableWebSocketChannel::Bridge::disconnect()
392 {
393     clearClientWrapper();
394     if (m_peer) {
395         Peer* peer = m_peer;
396         m_peer = 0;
397         m_loaderProxy.postTaskToLoader(createCallbackTask(&mainThreadDestroy, AllowCrossThreadAccess(peer)));
398     }
399     m_workerContext = 0;
400 }
401
402 void WorkerThreadableWebSocketChannel::mainThreadSuspend(ScriptExecutionContext* context, Peer* peer)
403 {
404     ASSERT(isMainThread());
405     ASSERT_UNUSED(context, context->isDocument());
406     ASSERT(peer);
407
408     peer->suspend();
409 }
410
411 void WorkerThreadableWebSocketChannel::Bridge::suspend()
412 {
413     ASSERT(m_peer);
414     m_loaderProxy.postTaskToLoader(createCallbackTask(&WorkerThreadableWebSocketChannel::mainThreadSuspend, AllowCrossThreadAccess(m_peer)));
415 }
416
417 void WorkerThreadableWebSocketChannel::mainThreadResume(ScriptExecutionContext* context, Peer* peer)
418 {
419     ASSERT(isMainThread());
420     ASSERT_UNUSED(context, context->isDocument());
421     ASSERT(peer);
422
423     peer->resume();
424 }
425
426 void WorkerThreadableWebSocketChannel::Bridge::resume()
427 {
428     ASSERT(m_peer);
429     m_loaderProxy.postTaskToLoader(createCallbackTask(&WorkerThreadableWebSocketChannel::mainThreadResume, AllowCrossThreadAccess(m_peer)));
430 }
431
432 void WorkerThreadableWebSocketChannel::Bridge::clearClientWrapper()
433 {
434     m_workerClientWrapper->clearClient();
435 }
436
437 void WorkerThreadableWebSocketChannel::Bridge::setMethodNotCompleted()
438 {
439     ASSERT(m_workerClientWrapper);
440     m_workerClientWrapper->clearSyncMethodDone();
441 }
442
443 // Caller of this function should hold a reference to the bridge, because this function may call WebSocket::didClose() in the end,
444 // which causes the bridge to get disconnected from the WebSocket and deleted if there is no other reference.
445 void WorkerThreadableWebSocketChannel::Bridge::waitForMethodCompletion()
446 {
447     if (!m_workerContext)
448         return;
449     WorkerRunLoop& runLoop = m_workerContext->thread()->runLoop();
450     MessageQueueWaitResult result = MessageQueueMessageReceived;
451     ThreadableWebSocketChannelClientWrapper* clientWrapper = m_workerClientWrapper.get();
452     while (m_workerContext && clientWrapper && !clientWrapper->syncMethodDone() && result != MessageQueueTerminated) {
453         result = runLoop.runInMode(m_workerContext.get(), m_taskMode); // May cause this bridge to get disconnected, which makes m_workerContext become null.
454         clientWrapper = m_workerClientWrapper.get();
455     }
456 }
457
458 } // namespace WebCore
459
460 #endif // ENABLE(WEB_SOCKETS)