2011-06-23 Darin Adler <darin@apple.com>
[WebKit-https.git] / Source / WebCore / websockets / WorkerThreadableWebSocketChannel.cpp
1 /*
2  * Copyright (C) 2009, 2010 Google Inc.  All rights reserved.
3  *
4  * Redistribution and use in source and binary forms, with or without
5  * modification, are permitted provided that the following conditions are
6  * met:
7  *
8  *     * Redistributions of source code must retain the above copyright
9  * notice, this list of conditions and the following disclaimer.
10  *     * Redistributions in binary form must reproduce the above
11  * copyright notice, this list of conditions and the following disclaimer
12  * in the documentation and/or other materials provided with the
13  * distribution.
14  *     * Neither the name of Google Inc. nor the names of its
15  * contributors may be used to endorse or promote products derived from
16  * this software without specific prior written permission.
17  *
18  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
19  * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
20  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
21  * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
22  * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
23  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
24  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
25  * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
26  * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
27  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
28  * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
29  */
30
31 #include "config.h"
32
33 #if ENABLE(WEB_SOCKETS) && ENABLE(WORKERS)
34
35 #include "WorkerThreadableWebSocketChannel.h"
36
37 #include "CrossThreadTask.h"
38 #include "PlatformString.h"
39 #include "ScriptExecutionContext.h"
40 #include "ThreadableWebSocketChannelClientWrapper.h"
41 #include "WebSocketChannel.h"
42 #include "WebSocketChannelClient.h"
43 #include "WorkerContext.h"
44 #include "WorkerLoaderProxy.h"
45 #include "WorkerRunLoop.h"
46 #include "WorkerThread.h"
47
48 #include <wtf/PassRefPtr.h>
49
50 namespace WebCore {
51
52 WorkerThreadableWebSocketChannel::WorkerThreadableWebSocketChannel(WorkerContext* context, WebSocketChannelClient* client, const String& taskMode, const KURL& url, const String& protocol)
53     : m_workerContext(context)
54     , m_workerClientWrapper(ThreadableWebSocketChannelClientWrapper::create(client))
55     , m_bridge(Bridge::create(m_workerClientWrapper, m_workerContext, taskMode, url, protocol))
56 {
57 }
58
59 WorkerThreadableWebSocketChannel::~WorkerThreadableWebSocketChannel()
60 {
61     if (m_bridge)
62         m_bridge->disconnect();
63 }
64
65 void WorkerThreadableWebSocketChannel::connect()
66 {
67     if (m_bridge)
68         m_bridge->connect();
69 }
70
71 bool WorkerThreadableWebSocketChannel::send(const String& message)
72 {
73     if (!m_bridge)
74         return false;
75     return m_bridge->send(message);
76 }
77
78 unsigned long WorkerThreadableWebSocketChannel::bufferedAmount() const
79 {
80     if (!m_bridge)
81         return 0;
82     return m_bridge->bufferedAmount();
83 }
84
85 void WorkerThreadableWebSocketChannel::close()
86 {
87     if (m_bridge)
88         m_bridge->close();
89 }
90
91 void WorkerThreadableWebSocketChannel::fail(const String& reason)
92 {
93     if (m_bridge)
94         m_bridge->fail(reason);
95 }
96
97 void WorkerThreadableWebSocketChannel::disconnect()
98 {
99     m_bridge->disconnect();
100     m_bridge.clear();
101 }
102
103 void WorkerThreadableWebSocketChannel::suspend()
104 {
105     m_workerClientWrapper->suspend();
106     if (m_bridge)
107         m_bridge->suspend();
108 }
109
110 void WorkerThreadableWebSocketChannel::resume()
111 {
112     m_workerClientWrapper->resume();
113     if (m_bridge)
114         m_bridge->resume();
115 }
116
117 WorkerThreadableWebSocketChannel::Peer::Peer(PassRefPtr<ThreadableWebSocketChannelClientWrapper> clientWrapper, WorkerLoaderProxy& loaderProxy, ScriptExecutionContext* context, const String& taskMode, const KURL& url, const String& protocol)
118     : m_workerClientWrapper(clientWrapper)
119     , m_loaderProxy(loaderProxy)
120     , m_mainWebSocketChannel(WebSocketChannel::create(context, this, url, protocol))
121     , m_taskMode(taskMode)
122 {
123     ASSERT(isMainThread());
124 }
125
126 WorkerThreadableWebSocketChannel::Peer::~Peer()
127 {
128     ASSERT(isMainThread());
129     if (m_mainWebSocketChannel)
130         m_mainWebSocketChannel->disconnect();
131 }
132
133 void WorkerThreadableWebSocketChannel::Peer::connect()
134 {
135     ASSERT(isMainThread());
136     if (!m_mainWebSocketChannel)
137         return;
138     m_mainWebSocketChannel->connect();
139 }
140
141 static void workerContextDidSend(ScriptExecutionContext* context, RefPtr<ThreadableWebSocketChannelClientWrapper> workerClientWrapper, bool sent)
142 {
143     ASSERT_UNUSED(context, context->isWorkerContext());
144     workerClientWrapper->setSent(sent);
145 }
146
147 void WorkerThreadableWebSocketChannel::Peer::send(const String& message)
148 {
149     ASSERT(isMainThread());
150     if (!m_mainWebSocketChannel || !m_workerClientWrapper)
151         return;
152     bool sent = m_mainWebSocketChannel->send(message);
153     m_loaderProxy.postTaskForModeToWorkerContext(createCallbackTask(&workerContextDidSend, m_workerClientWrapper, sent), m_taskMode);
154 }
155
156 static void workerContextDidGetBufferedAmount(ScriptExecutionContext* context, RefPtr<ThreadableWebSocketChannelClientWrapper> workerClientWrapper, unsigned long bufferedAmount)
157 {
158     ASSERT_UNUSED(context, context->isWorkerContext());
159     workerClientWrapper->setBufferedAmount(bufferedAmount);
160 }
161
162 void WorkerThreadableWebSocketChannel::Peer::bufferedAmount()
163 {
164     ASSERT(isMainThread());
165     if (!m_mainWebSocketChannel || !m_workerClientWrapper)
166         return;
167     unsigned long bufferedAmount = m_mainWebSocketChannel->bufferedAmount();
168     m_loaderProxy.postTaskForModeToWorkerContext(createCallbackTask(&workerContextDidGetBufferedAmount, m_workerClientWrapper, bufferedAmount), m_taskMode);
169 }
170
171 void WorkerThreadableWebSocketChannel::Peer::close()
172 {
173     ASSERT(isMainThread());
174     if (!m_mainWebSocketChannel)
175         return;
176     m_mainWebSocketChannel->close();
177 }
178
179 void WorkerThreadableWebSocketChannel::Peer::fail(const String& reason)
180 {
181     ASSERT(isMainThread());
182     if (!m_mainWebSocketChannel)
183         return;
184     m_mainWebSocketChannel->fail(reason);
185 }
186
187 void WorkerThreadableWebSocketChannel::Peer::disconnect()
188 {
189     ASSERT(isMainThread());
190     if (!m_mainWebSocketChannel)
191         return;
192     m_mainWebSocketChannel->disconnect();
193     m_mainWebSocketChannel = 0;
194 }
195
196 void WorkerThreadableWebSocketChannel::Peer::suspend()
197 {
198     ASSERT(isMainThread());
199     if (!m_mainWebSocketChannel)
200         return;
201     m_mainWebSocketChannel->suspend();
202 }
203
204 void WorkerThreadableWebSocketChannel::Peer::resume()
205 {
206     ASSERT(isMainThread());
207     if (!m_mainWebSocketChannel)
208         return;
209     m_mainWebSocketChannel->resume();
210 }
211
212 static void workerContextDidConnect(ScriptExecutionContext* context, RefPtr<ThreadableWebSocketChannelClientWrapper> workerClientWrapper)
213 {
214     ASSERT_UNUSED(context, context->isWorkerContext());
215     workerClientWrapper->didConnect();
216 }
217
218 void WorkerThreadableWebSocketChannel::Peer::didConnect()
219 {
220     ASSERT(isMainThread());
221     m_loaderProxy.postTaskForModeToWorkerContext(createCallbackTask(&workerContextDidConnect, m_workerClientWrapper), m_taskMode);
222 }
223
224 static void workerContextDidReceiveMessage(ScriptExecutionContext* context, RefPtr<ThreadableWebSocketChannelClientWrapper> workerClientWrapper, const String& message)
225 {
226     ASSERT_UNUSED(context, context->isWorkerContext());
227     workerClientWrapper->didReceiveMessage(message);
228 }
229
230 void WorkerThreadableWebSocketChannel::Peer::didReceiveMessage(const String& message)
231 {
232     ASSERT(isMainThread());
233     m_loaderProxy.postTaskForModeToWorkerContext(createCallbackTask(&workerContextDidReceiveMessage, m_workerClientWrapper, message), m_taskMode);
234 }
235
236 static void workerContextDidStartClosingHandshake(ScriptExecutionContext* context, RefPtr<ThreadableWebSocketChannelClientWrapper> workerClientWrapper)
237 {
238     ASSERT_UNUSED(context, context->isWorkerContext());
239     workerClientWrapper->didStartClosingHandshake();
240 }
241
242 void WorkerThreadableWebSocketChannel::Peer::didStartClosingHandshake()
243 {
244     ASSERT(isMainThread());
245     m_loaderProxy.postTaskForModeToWorkerContext(createCallbackTask(&workerContextDidStartClosingHandshake, m_workerClientWrapper), m_taskMode);
246 }
247
248 static void workerContextDidClose(ScriptExecutionContext* context, RefPtr<ThreadableWebSocketChannelClientWrapper> workerClientWrapper, unsigned long unhandledBufferedAmount, WebSocketChannelClient::ClosingHandshakeCompletionStatus closingHandshakeCompletion)
249 {
250     ASSERT_UNUSED(context, context->isWorkerContext());
251     workerClientWrapper->didClose(unhandledBufferedAmount, closingHandshakeCompletion);
252 }
253
254 void WorkerThreadableWebSocketChannel::Peer::didClose(unsigned long unhandledBufferedAmount, ClosingHandshakeCompletionStatus closingHandshakeCompletion)
255 {
256     ASSERT(isMainThread());
257     m_mainWebSocketChannel = 0;
258     m_loaderProxy.postTaskForModeToWorkerContext(createCallbackTask(&workerContextDidClose, m_workerClientWrapper, unhandledBufferedAmount, closingHandshakeCompletion), m_taskMode);
259 }
260
261 void WorkerThreadableWebSocketChannel::Bridge::setWebSocketChannel(ScriptExecutionContext* context, Bridge* thisPtr, Peer* peer, PassRefPtr<ThreadableWebSocketChannelClientWrapper> workerClientWrapper)
262 {
263     ASSERT_UNUSED(context, context->isWorkerContext());
264     thisPtr->m_peer = peer;
265     workerClientWrapper->setSyncMethodDone();
266 }
267
268 void WorkerThreadableWebSocketChannel::Bridge::mainThreadCreateWebSocketChannel(ScriptExecutionContext* context, Bridge* thisPtr, PassRefPtr<ThreadableWebSocketChannelClientWrapper> prpClientWrapper, const String& taskMode, const KURL& url, const String& protocol)
269 {
270     ASSERT(isMainThread());
271     ASSERT_UNUSED(context, context->isDocument());
272
273     RefPtr<ThreadableWebSocketChannelClientWrapper> clientWrapper = prpClientWrapper;
274
275     Peer* peer = Peer::create(clientWrapper, thisPtr->m_loaderProxy, context, taskMode, url, protocol);
276     thisPtr->m_loaderProxy.postTaskForModeToWorkerContext(
277         createCallbackTask(&Bridge::setWebSocketChannel,
278                            AllowCrossThreadAccess(thisPtr),
279                            AllowCrossThreadAccess(peer), clientWrapper), taskMode);
280 }
281
282 WorkerThreadableWebSocketChannel::Bridge::Bridge(PassRefPtr<ThreadableWebSocketChannelClientWrapper> workerClientWrapper, PassRefPtr<WorkerContext> workerContext, const String& taskMode, const KURL& url, const String& protocol)
283     : m_workerClientWrapper(workerClientWrapper)
284     , m_workerContext(workerContext)
285     , m_loaderProxy(m_workerContext->thread()->workerLoaderProxy())
286     , m_taskMode(taskMode)
287     , m_peer(0)
288 {
289     ASSERT(m_workerClientWrapper.get());
290     setMethodNotCompleted();
291     m_loaderProxy.postTaskToLoader(
292         createCallbackTask(&Bridge::mainThreadCreateWebSocketChannel,
293                            AllowCrossThreadAccess(this), m_workerClientWrapper, m_taskMode, url, protocol));
294     waitForMethodCompletion();
295     ASSERT(m_peer);
296 }
297
298 WorkerThreadableWebSocketChannel::Bridge::~Bridge()
299 {
300     disconnect();
301 }
302
303 void WorkerThreadableWebSocketChannel::mainThreadConnect(ScriptExecutionContext* context, Peer* peer)
304 {
305     ASSERT(isMainThread());
306     ASSERT_UNUSED(context, context->isDocument());
307     ASSERT(peer);
308
309     peer->connect();
310 }
311
312 void WorkerThreadableWebSocketChannel::Bridge::connect()
313 {
314     ASSERT(m_workerClientWrapper);
315     ASSERT(m_peer);
316     m_loaderProxy.postTaskToLoader(createCallbackTask(&WorkerThreadableWebSocketChannel::mainThreadConnect, AllowCrossThreadAccess(m_peer)));
317 }
318
319 void WorkerThreadableWebSocketChannel::mainThreadSend(ScriptExecutionContext* context, Peer* peer, const String& message)
320 {
321     ASSERT(isMainThread());
322     ASSERT_UNUSED(context, context->isDocument());
323     ASSERT(peer);
324
325     peer->send(message);
326 }
327
328 bool WorkerThreadableWebSocketChannel::Bridge::send(const String& message)
329 {
330     if (!m_workerClientWrapper)
331         return false;
332     ASSERT(m_peer);
333     setMethodNotCompleted();
334     m_loaderProxy.postTaskToLoader(createCallbackTask(&WorkerThreadableWebSocketChannel::mainThreadSend, AllowCrossThreadAccess(m_peer), message));
335     RefPtr<Bridge> protect(this);
336     waitForMethodCompletion();
337     ThreadableWebSocketChannelClientWrapper* clientWrapper = m_workerClientWrapper.get();
338     return clientWrapper && clientWrapper->sent();
339 }
340
341 void WorkerThreadableWebSocketChannel::mainThreadBufferedAmount(ScriptExecutionContext* context, Peer* peer)
342 {
343     ASSERT(isMainThread());
344     ASSERT_UNUSED(context, context->isDocument());
345     ASSERT(peer);
346
347     peer->bufferedAmount();
348 }
349
350 unsigned long WorkerThreadableWebSocketChannel::Bridge::bufferedAmount()
351 {
352     if (!m_workerClientWrapper)
353         return 0;
354     ASSERT(m_peer);
355     setMethodNotCompleted();
356     m_loaderProxy.postTaskToLoader(createCallbackTask(&WorkerThreadableWebSocketChannel::mainThreadBufferedAmount, AllowCrossThreadAccess(m_peer)));
357     RefPtr<Bridge> protect(this);
358     waitForMethodCompletion();
359     ThreadableWebSocketChannelClientWrapper* clientWrapper = m_workerClientWrapper.get();
360     if (clientWrapper)
361         return clientWrapper->bufferedAmount();
362     return 0;
363 }
364
365 void WorkerThreadableWebSocketChannel::mainThreadClose(ScriptExecutionContext* context, Peer* peer)
366 {
367     ASSERT(isMainThread());
368     ASSERT_UNUSED(context, context->isDocument());
369     ASSERT(peer);
370
371     peer->close();
372 }
373
374 void WorkerThreadableWebSocketChannel::Bridge::close()
375 {
376     ASSERT(m_peer);
377     m_loaderProxy.postTaskToLoader(createCallbackTask(&WorkerThreadableWebSocketChannel::mainThreadClose, AllowCrossThreadAccess(m_peer)));
378 }
379
380 void WorkerThreadableWebSocketChannel::mainThreadFail(ScriptExecutionContext* context, Peer* peer, const String& reason)
381 {
382     ASSERT(isMainThread());
383     ASSERT_UNUSED(context, context->isDocument());
384     ASSERT(peer);
385
386     peer->fail(reason);
387 }
388
389 void WorkerThreadableWebSocketChannel::Bridge::fail(const String& reason)
390 {
391     ASSERT(m_peer);
392     m_loaderProxy.postTaskToLoader(createCallbackTask(&WorkerThreadableWebSocketChannel::mainThreadFail, AllowCrossThreadAccess(m_peer), reason));
393 }
394
395 void WorkerThreadableWebSocketChannel::mainThreadDestroy(ScriptExecutionContext* context, Peer* peer)
396 {
397     ASSERT(isMainThread());
398     ASSERT_UNUSED(context, context->isDocument());
399     ASSERT(peer);
400
401     delete peer;
402 }
403
404 void WorkerThreadableWebSocketChannel::Bridge::disconnect()
405 {
406     clearClientWrapper();
407     if (m_peer) {
408         Peer* peer = m_peer;
409         m_peer = 0;
410         m_loaderProxy.postTaskToLoader(createCallbackTask(&mainThreadDestroy, AllowCrossThreadAccess(peer)));
411     }
412     m_workerContext = 0;
413 }
414
415 void WorkerThreadableWebSocketChannel::mainThreadSuspend(ScriptExecutionContext* context, Peer* peer)
416 {
417     ASSERT(isMainThread());
418     ASSERT_UNUSED(context, context->isDocument());
419     ASSERT(peer);
420
421     peer->suspend();
422 }
423
424 void WorkerThreadableWebSocketChannel::Bridge::suspend()
425 {
426     ASSERT(m_peer);
427     m_loaderProxy.postTaskToLoader(createCallbackTask(&WorkerThreadableWebSocketChannel::mainThreadSuspend, AllowCrossThreadAccess(m_peer)));
428 }
429
430 void WorkerThreadableWebSocketChannel::mainThreadResume(ScriptExecutionContext* context, Peer* peer)
431 {
432     ASSERT(isMainThread());
433     ASSERT_UNUSED(context, context->isDocument());
434     ASSERT(peer);
435
436     peer->resume();
437 }
438
439 void WorkerThreadableWebSocketChannel::Bridge::resume()
440 {
441     ASSERT(m_peer);
442     m_loaderProxy.postTaskToLoader(createCallbackTask(&WorkerThreadableWebSocketChannel::mainThreadResume, AllowCrossThreadAccess(m_peer)));
443 }
444
445 void WorkerThreadableWebSocketChannel::Bridge::clearClientWrapper()
446 {
447     m_workerClientWrapper->clearClient();
448 }
449
450 void WorkerThreadableWebSocketChannel::Bridge::setMethodNotCompleted()
451 {
452     ASSERT(m_workerClientWrapper);
453     m_workerClientWrapper->clearSyncMethodDone();
454 }
455
456 // Caller of this function should hold a reference to the bridge, because this function may call WebSocket::didClose() in the end,
457 // which causes the bridge to get disconnected from the WebSocket and deleted if there is no other reference.
458 void WorkerThreadableWebSocketChannel::Bridge::waitForMethodCompletion()
459 {
460     if (!m_workerContext)
461         return;
462     WorkerRunLoop& runLoop = m_workerContext->thread()->runLoop();
463     MessageQueueWaitResult result = MessageQueueMessageReceived;
464     ThreadableWebSocketChannelClientWrapper* clientWrapper = m_workerClientWrapper.get();
465     while (m_workerContext && clientWrapper && !clientWrapper->syncMethodDone() && result != MessageQueueTerminated) {
466         result = runLoop.runInMode(m_workerContext.get(), m_taskMode); // May cause this bridge to get disconnected, which makes m_workerContext become null.
467         clientWrapper = m_workerClientWrapper.get();
468     }
469 }
470
471 } // namespace WebCore
472
473 #endif // ENABLE(WEB_SOCKETS)