143c8f2b0a7aff75d4012276cf7e8926fb46f220
[WebKit-https.git] / Source / WebCore / Modules / websockets / WorkerThreadableWebSocketChannel.cpp
1 /*
2  * Copyright (C) 2011, 2012 Google Inc.  All rights reserved.
3  *
4  * Redistribution and use in source and binary forms, with or without
5  * modification, are permitted provided that the following conditions are
6  * met:
7  *
8  *     * Redistributions of source code must retain the above copyright
9  * notice, this list of conditions and the following disclaimer.
10  *     * Redistributions in binary form must reproduce the above
11  * copyright notice, this list of conditions and the following disclaimer
12  * in the documentation and/or other materials provided with the
13  * distribution.
14  *     * Neither the name of Google Inc. nor the names of its
15  * contributors may be used to endorse or promote products derived from
16  * this software without specific prior written permission.
17  *
18  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
19  * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
20  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
21  * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
22  * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
23  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
24  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
25  * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
26  * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
27  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
28  * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
29  */
30
31 #include "config.h"
32
33 #if ENABLE(WEB_SOCKETS) && ENABLE(WORKERS)
34
35 #include "WorkerThreadableWebSocketChannel.h"
36
37 #include "Blob.h"
38 #include "CrossThreadTask.h"
39 #include "Document.h"
40 #include "ScriptExecutionContext.h"
41 #include "ThreadableWebSocketChannelClientWrapper.h"
42 #include "WebSocketChannel.h"
43 #include "WebSocketChannelClient.h"
44 #include "WorkerContext.h"
45 #include "WorkerLoaderProxy.h"
46 #include "WorkerRunLoop.h"
47 #include "WorkerThread.h"
48 #include <wtf/ArrayBuffer.h>
49 #include <wtf/MainThread.h>
50 #include <wtf/PassRefPtr.h>
51 #include <wtf/text/WTFString.h>
52
53 namespace WebCore {
54
55 WorkerThreadableWebSocketChannel::WorkerThreadableWebSocketChannel(WorkerContext* context, WebSocketChannelClient* client, const String& taskMode)
56     : m_workerContext(context)
57     , m_workerClientWrapper(ThreadableWebSocketChannelClientWrapper::create(context, client))
58     , m_bridge(Bridge::create(m_workerClientWrapper, m_workerContext, taskMode))
59 {
60     m_bridge->initialize();
61 }
62
63 WorkerThreadableWebSocketChannel::~WorkerThreadableWebSocketChannel()
64 {
65     if (m_bridge)
66         m_bridge->disconnect();
67 }
68
69 void WorkerThreadableWebSocketChannel::connect(const KURL& url, const String& protocol)
70 {
71     if (m_bridge)
72         m_bridge->connect(url, protocol);
73 }
74
75 String WorkerThreadableWebSocketChannel::subprotocol()
76 {
77     ASSERT(m_workerClientWrapper);
78     return m_workerClientWrapper->subprotocol();
79 }
80
81 String WorkerThreadableWebSocketChannel::extensions()
82 {
83     ASSERT(m_workerClientWrapper);
84     return m_workerClientWrapper->extensions();
85 }
86
87 ThreadableWebSocketChannel::SendResult WorkerThreadableWebSocketChannel::send(const String& message)
88 {
89     if (!m_bridge)
90         return ThreadableWebSocketChannel::SendFail;
91     return m_bridge->send(message);
92 }
93
94 ThreadableWebSocketChannel::SendResult WorkerThreadableWebSocketChannel::send(const ArrayBuffer& binaryData, unsigned byteOffset, unsigned byteLength)
95 {
96     if (!m_bridge)
97         return ThreadableWebSocketChannel::SendFail;
98     return m_bridge->send(binaryData, byteOffset, byteLength);
99 }
100
101 ThreadableWebSocketChannel::SendResult WorkerThreadableWebSocketChannel::send(const Blob& binaryData)
102 {
103     if (!m_bridge)
104         return ThreadableWebSocketChannel::SendFail;
105     return m_bridge->send(binaryData);
106 }
107
108 unsigned long WorkerThreadableWebSocketChannel::bufferedAmount() const
109 {
110     if (!m_bridge)
111         return 0;
112     return m_bridge->bufferedAmount();
113 }
114
115 void WorkerThreadableWebSocketChannel::close(int code, const String& reason)
116 {
117     if (m_bridge)
118         m_bridge->close(code, reason);
119 }
120
121 void WorkerThreadableWebSocketChannel::fail(const String& reason)
122 {
123     if (m_bridge)
124         m_bridge->fail(reason);
125 }
126
127 void WorkerThreadableWebSocketChannel::disconnect()
128 {
129     m_bridge->disconnect();
130     m_bridge.clear();
131 }
132
133 void WorkerThreadableWebSocketChannel::suspend()
134 {
135     m_workerClientWrapper->suspend();
136     if (m_bridge)
137         m_bridge->suspend();
138 }
139
140 void WorkerThreadableWebSocketChannel::resume()
141 {
142     m_workerClientWrapper->resume();
143     if (m_bridge)
144         m_bridge->resume();
145 }
146
147 WorkerThreadableWebSocketChannel::Peer::Peer(PassRefPtr<ThreadableWebSocketChannelClientWrapper> clientWrapper, WorkerLoaderProxy& loaderProxy, ScriptExecutionContext* context, const String& taskMode)
148     : m_workerClientWrapper(clientWrapper)
149     , m_loaderProxy(loaderProxy)
150     , m_mainWebSocketChannel(WebSocketChannel::create(static_cast<Document*>(context), this))
151     , m_taskMode(taskMode)
152 {
153     ASSERT(isMainThread());
154 }
155
156 WorkerThreadableWebSocketChannel::Peer::~Peer()
157 {
158     ASSERT(isMainThread());
159     if (m_mainWebSocketChannel)
160         m_mainWebSocketChannel->disconnect();
161 }
162
163 void WorkerThreadableWebSocketChannel::Peer::connect(const KURL& url, const String& protocol)
164 {
165     ASSERT(isMainThread());
166     if (!m_mainWebSocketChannel)
167         return;
168     m_mainWebSocketChannel->connect(url, protocol);
169 }
170
171 static void workerContextDidSend(ScriptExecutionContext* context, PassRefPtr<ThreadableWebSocketChannelClientWrapper> workerClientWrapper, ThreadableWebSocketChannel::SendResult sendRequestResult)
172 {
173     ASSERT_UNUSED(context, context->isWorkerContext());
174     workerClientWrapper->setSendRequestResult(sendRequestResult);
175 }
176
177 void WorkerThreadableWebSocketChannel::Peer::send(const String& message)
178 {
179     ASSERT(isMainThread());
180     if (!m_mainWebSocketChannel || !m_workerClientWrapper)
181         return;
182     ThreadableWebSocketChannel::SendResult sendRequestResult = m_mainWebSocketChannel->send(message);
183     m_loaderProxy.postTaskForModeToWorkerContext(createCallbackTask(&workerContextDidSend, m_workerClientWrapper, sendRequestResult), m_taskMode);
184 }
185
186 void WorkerThreadableWebSocketChannel::Peer::send(const ArrayBuffer& binaryData)
187 {
188     ASSERT(isMainThread());
189     if (!m_mainWebSocketChannel || !m_workerClientWrapper)
190         return;
191     ThreadableWebSocketChannel::SendResult sendRequestResult = m_mainWebSocketChannel->send(binaryData, 0, binaryData.byteLength());
192     m_loaderProxy.postTaskForModeToWorkerContext(createCallbackTask(&workerContextDidSend, m_workerClientWrapper, sendRequestResult), m_taskMode);
193 }
194
195 void WorkerThreadableWebSocketChannel::Peer::send(const Blob& binaryData)
196 {
197     ASSERT(isMainThread());
198     if (!m_mainWebSocketChannel || !m_workerClientWrapper)
199         return;
200     ThreadableWebSocketChannel::SendResult sendRequestResult = m_mainWebSocketChannel->send(binaryData);
201     m_loaderProxy.postTaskForModeToWorkerContext(createCallbackTask(&workerContextDidSend, m_workerClientWrapper, sendRequestResult), m_taskMode);
202 }
203
204 static void workerContextDidGetBufferedAmount(ScriptExecutionContext* context, PassRefPtr<ThreadableWebSocketChannelClientWrapper> workerClientWrapper, unsigned long bufferedAmount)
205 {
206     ASSERT_UNUSED(context, context->isWorkerContext());
207     workerClientWrapper->setBufferedAmount(bufferedAmount);
208 }
209
210 void WorkerThreadableWebSocketChannel::Peer::bufferedAmount()
211 {
212     ASSERT(isMainThread());
213     if (!m_mainWebSocketChannel || !m_workerClientWrapper)
214         return;
215     unsigned long bufferedAmount = m_mainWebSocketChannel->bufferedAmount();
216     m_loaderProxy.postTaskForModeToWorkerContext(createCallbackTask(&workerContextDidGetBufferedAmount, m_workerClientWrapper, bufferedAmount), m_taskMode);
217 }
218
219 void WorkerThreadableWebSocketChannel::Peer::close(int code, const String& reason)
220 {
221     ASSERT(isMainThread());
222     if (!m_mainWebSocketChannel)
223         return;
224     m_mainWebSocketChannel->close(code, reason);
225 }
226
227 void WorkerThreadableWebSocketChannel::Peer::fail(const String& reason)
228 {
229     ASSERT(isMainThread());
230     if (!m_mainWebSocketChannel)
231         return;
232     m_mainWebSocketChannel->fail(reason);
233 }
234
235 void WorkerThreadableWebSocketChannel::Peer::disconnect()
236 {
237     ASSERT(isMainThread());
238     if (!m_mainWebSocketChannel)
239         return;
240     m_mainWebSocketChannel->disconnect();
241     m_mainWebSocketChannel = 0;
242 }
243
244 void WorkerThreadableWebSocketChannel::Peer::suspend()
245 {
246     ASSERT(isMainThread());
247     if (!m_mainWebSocketChannel)
248         return;
249     m_mainWebSocketChannel->suspend();
250 }
251
252 void WorkerThreadableWebSocketChannel::Peer::resume()
253 {
254     ASSERT(isMainThread());
255     if (!m_mainWebSocketChannel)
256         return;
257     m_mainWebSocketChannel->resume();
258 }
259
260 static void workerContextDidConnect(ScriptExecutionContext* context, PassRefPtr<ThreadableWebSocketChannelClientWrapper> workerClientWrapper, const String& subprotocol, const String& extensions)
261 {
262     ASSERT_UNUSED(context, context->isWorkerContext());
263     workerClientWrapper->setSubprotocol(subprotocol);
264     workerClientWrapper->setExtensions(extensions);
265     workerClientWrapper->didConnect();
266 }
267
268 void WorkerThreadableWebSocketChannel::Peer::didConnect()
269 {
270     ASSERT(isMainThread());
271     m_loaderProxy.postTaskForModeToWorkerContext(createCallbackTask(&workerContextDidConnect, m_workerClientWrapper, m_mainWebSocketChannel->subprotocol(), m_mainWebSocketChannel->extensions()), m_taskMode);
272 }
273
274 static void workerContextDidReceiveMessage(ScriptExecutionContext* context, PassRefPtr<ThreadableWebSocketChannelClientWrapper> workerClientWrapper, const String& message)
275 {
276     ASSERT_UNUSED(context, context->isWorkerContext());
277     workerClientWrapper->didReceiveMessage(message);
278 }
279
280 void WorkerThreadableWebSocketChannel::Peer::didReceiveMessage(const String& message)
281 {
282     ASSERT(isMainThread());
283     m_loaderProxy.postTaskForModeToWorkerContext(createCallbackTask(&workerContextDidReceiveMessage, m_workerClientWrapper, message), m_taskMode);
284 }
285
286 static void workerContextDidReceiveBinaryData(ScriptExecutionContext* context, PassRefPtr<ThreadableWebSocketChannelClientWrapper> workerClientWrapper, PassOwnPtr<Vector<char> > binaryData)
287 {
288     ASSERT_UNUSED(context, context->isWorkerContext());
289     workerClientWrapper->didReceiveBinaryData(binaryData);
290 }
291
292 void WorkerThreadableWebSocketChannel::Peer::didReceiveBinaryData(PassOwnPtr<Vector<char> > binaryData)
293 {
294     ASSERT(isMainThread());
295     m_loaderProxy.postTaskForModeToWorkerContext(createCallbackTask(&workerContextDidReceiveBinaryData, m_workerClientWrapper, binaryData), m_taskMode);
296 }
297
298 static void workerContextDidUpdateBufferedAmount(ScriptExecutionContext* context, PassRefPtr<ThreadableWebSocketChannelClientWrapper> workerClientWrapper, unsigned long bufferedAmount)
299 {
300     ASSERT_UNUSED(context, context->isWorkerContext());
301     workerClientWrapper->didUpdateBufferedAmount(bufferedAmount);
302 }
303
304 void WorkerThreadableWebSocketChannel::Peer::didUpdateBufferedAmount(unsigned long bufferedAmount)
305 {
306     ASSERT(isMainThread());
307     m_loaderProxy.postTaskForModeToWorkerContext(createCallbackTask(&workerContextDidUpdateBufferedAmount, m_workerClientWrapper, bufferedAmount), m_taskMode);
308 }
309
310 static void workerContextDidStartClosingHandshake(ScriptExecutionContext* context, PassRefPtr<ThreadableWebSocketChannelClientWrapper> workerClientWrapper)
311 {
312     ASSERT_UNUSED(context, context->isWorkerContext());
313     workerClientWrapper->didStartClosingHandshake();
314 }
315
316 void WorkerThreadableWebSocketChannel::Peer::didStartClosingHandshake()
317 {
318     ASSERT(isMainThread());
319     m_loaderProxy.postTaskForModeToWorkerContext(createCallbackTask(&workerContextDidStartClosingHandshake, m_workerClientWrapper), m_taskMode);
320 }
321
322 static void workerContextDidClose(ScriptExecutionContext* context, PassRefPtr<ThreadableWebSocketChannelClientWrapper> workerClientWrapper, unsigned long unhandledBufferedAmount, WebSocketChannelClient::ClosingHandshakeCompletionStatus closingHandshakeCompletion, unsigned short code, const String& reason)
323 {
324     ASSERT_UNUSED(context, context->isWorkerContext());
325     workerClientWrapper->didClose(unhandledBufferedAmount, closingHandshakeCompletion, code, reason);
326 }
327
328 void WorkerThreadableWebSocketChannel::Peer::didClose(unsigned long unhandledBufferedAmount, ClosingHandshakeCompletionStatus closingHandshakeCompletion, unsigned short code, const String& reason)
329 {
330     ASSERT(isMainThread());
331     m_mainWebSocketChannel = 0;
332     m_loaderProxy.postTaskForModeToWorkerContext(createCallbackTask(&workerContextDidClose, m_workerClientWrapper, unhandledBufferedAmount, closingHandshakeCompletion, code, reason), m_taskMode);
333 }
334
335 static void workerContextDidReceiveMessageError(ScriptExecutionContext* context, PassRefPtr<ThreadableWebSocketChannelClientWrapper> workerClientWrapper)
336 {
337     ASSERT_UNUSED(context, context->isWorkerContext());
338     workerClientWrapper->didReceiveMessageError();
339 }
340
341 void WorkerThreadableWebSocketChannel::Peer::didReceiveMessageError()
342 {
343      ASSERT(isMainThread());
344      m_loaderProxy.postTaskForModeToWorkerContext(createCallbackTask(&workerContextDidReceiveMessageError, m_workerClientWrapper), m_taskMode);
345 }
346
347 WorkerThreadableWebSocketChannel::Bridge::Bridge(PassRefPtr<ThreadableWebSocketChannelClientWrapper> workerClientWrapper, PassRefPtr<WorkerContext> workerContext, const String& taskMode)
348     : m_workerClientWrapper(workerClientWrapper)
349     , m_workerContext(workerContext)
350     , m_loaderProxy(m_workerContext->thread()->workerLoaderProxy())
351     , m_taskMode(taskMode)
352     , m_peer(0)
353 {
354     ASSERT(m_workerClientWrapper.get());
355 }
356
357 WorkerThreadableWebSocketChannel::Bridge::~Bridge()
358 {
359     disconnect();
360 }
361
362 class WorkerThreadableWebSocketChannel::WorkerContextDidInitializeTask : public ScriptExecutionContext::Task {
363 public:
364     static PassOwnPtr<ScriptExecutionContext::Task> create(WorkerThreadableWebSocketChannel::Peer* peer,
365                                                            WorkerLoaderProxy* loaderProxy,
366                                                            PassRefPtr<ThreadableWebSocketChannelClientWrapper> workerClientWrapper)
367     {
368         return adoptPtr(new WorkerContextDidInitializeTask(peer, loaderProxy, workerClientWrapper));
369     }
370
371     virtual ~WorkerContextDidInitializeTask() { }
372     virtual void performTask(ScriptExecutionContext* context) OVERRIDE
373     {
374         ASSERT_UNUSED(context, context->isWorkerContext());
375         if (m_workerClientWrapper->failedWebSocketChannelCreation()) {
376             // If Bridge::initialize() quitted earlier, we need to kick mainThreadDestroy() to delete the peer.
377             OwnPtr<WorkerThreadableWebSocketChannel::Peer> peer = adoptPtr(m_peer);
378             m_peer = 0;
379             m_loaderProxy->postTaskToLoader(createCallbackTask(&WorkerThreadableWebSocketChannel::mainThreadDestroy, peer.release()));
380         } else
381             m_workerClientWrapper->didCreateWebSocketChannel(m_peer);
382     }
383     virtual bool isCleanupTask() const OVERRIDE { return true; }
384
385 private:
386     WorkerContextDidInitializeTask(WorkerThreadableWebSocketChannel::Peer* peer,
387                                    WorkerLoaderProxy* loaderProxy,
388                                    PassRefPtr<ThreadableWebSocketChannelClientWrapper> workerClientWrapper)
389         : m_peer(peer)
390         , m_loaderProxy(loaderProxy)
391         , m_workerClientWrapper(workerClientWrapper)
392     {
393     }
394
395     WorkerThreadableWebSocketChannel::Peer* m_peer;
396     WorkerLoaderProxy* m_loaderProxy;
397     RefPtr<ThreadableWebSocketChannelClientWrapper> m_workerClientWrapper;
398 };
399
400 void WorkerThreadableWebSocketChannel::Bridge::mainThreadInitialize(ScriptExecutionContext* context, WorkerLoaderProxy* loaderProxy, PassRefPtr<ThreadableWebSocketChannelClientWrapper> prpClientWrapper, const String& taskMode)
401 {
402     ASSERT(isMainThread());
403     ASSERT_UNUSED(context, context->isDocument());
404
405     RefPtr<ThreadableWebSocketChannelClientWrapper> clientWrapper = prpClientWrapper;
406
407     Peer* peer = Peer::create(clientWrapper, *loaderProxy, context, taskMode);
408     bool sent = loaderProxy->postTaskForModeToWorkerContext(
409         WorkerThreadableWebSocketChannel::WorkerContextDidInitializeTask::create(peer, loaderProxy, clientWrapper), taskMode);
410     if (!sent) {
411         clientWrapper->clearPeer();
412         delete peer;
413     }
414 }
415
416 void WorkerThreadableWebSocketChannel::Bridge::initialize()
417 {
418     ASSERT(!m_peer);
419     setMethodNotCompleted();
420     RefPtr<Bridge> protect(this);
421     m_loaderProxy.postTaskToLoader(
422         createCallbackTask(&Bridge::mainThreadInitialize,
423                            AllowCrossThreadAccess(&m_loaderProxy), m_workerClientWrapper, m_taskMode));
424     waitForMethodCompletion();
425     // m_peer may be null when the nested runloop exited before a peer has created.
426     m_peer = m_workerClientWrapper->peer();
427     if (!m_peer)
428         m_workerClientWrapper->setFailedWebSocketChannelCreation();
429 }
430
431 void WorkerThreadableWebSocketChannel::mainThreadConnect(ScriptExecutionContext* context, Peer* peer, const KURL& url, const String& protocol)
432 {
433     ASSERT(isMainThread());
434     ASSERT_UNUSED(context, context->isDocument());
435     ASSERT(peer);
436
437     peer->connect(url, protocol);
438 }
439
440 void WorkerThreadableWebSocketChannel::Bridge::connect(const KURL& url, const String& protocol)
441 {
442     ASSERT(m_workerClientWrapper);
443     if (!m_peer)
444         return;
445     m_loaderProxy.postTaskToLoader(createCallbackTask(&WorkerThreadableWebSocketChannel::mainThreadConnect, AllowCrossThreadAccess(m_peer), url, protocol));
446 }
447
448 void WorkerThreadableWebSocketChannel::mainThreadSend(ScriptExecutionContext* context, Peer* peer, const String& message)
449 {
450     ASSERT(isMainThread());
451     ASSERT_UNUSED(context, context->isDocument());
452     ASSERT(peer);
453
454     peer->send(message);
455 }
456
457 void WorkerThreadableWebSocketChannel::mainThreadSendArrayBuffer(ScriptExecutionContext* context, Peer* peer, PassOwnPtr<Vector<char> > data)
458 {
459     ASSERT(isMainThread());
460     ASSERT_UNUSED(context, context->isDocument());
461     ASSERT(peer);
462
463     RefPtr<ArrayBuffer> arrayBuffer = ArrayBuffer::create(data->data(), data->size());
464     peer->send(*arrayBuffer);
465 }
466
467 void WorkerThreadableWebSocketChannel::mainThreadSendBlob(ScriptExecutionContext* context, Peer* peer, const KURL& url, const String& type, long long size)
468 {
469     ASSERT(isMainThread());
470     ASSERT_UNUSED(context, context->isDocument());
471     ASSERT(peer);
472
473     RefPtr<Blob> blob = Blob::create(url, type, size);
474     peer->send(*blob);
475 }
476
477 ThreadableWebSocketChannel::SendResult WorkerThreadableWebSocketChannel::Bridge::send(const String& message)
478 {
479     if (!m_workerClientWrapper || !m_peer)
480         return ThreadableWebSocketChannel::SendFail;
481     setMethodNotCompleted();
482     m_loaderProxy.postTaskToLoader(createCallbackTask(&WorkerThreadableWebSocketChannel::mainThreadSend, AllowCrossThreadAccess(m_peer), message));
483     RefPtr<Bridge> protect(this);
484     waitForMethodCompletion();
485     ThreadableWebSocketChannelClientWrapper* clientWrapper = m_workerClientWrapper.get();
486     if (!clientWrapper)
487         return ThreadableWebSocketChannel::SendFail;
488     return clientWrapper->sendRequestResult();
489 }
490
491 ThreadableWebSocketChannel::SendResult WorkerThreadableWebSocketChannel::Bridge::send(const ArrayBuffer& binaryData, unsigned byteOffset, unsigned byteLength)
492 {
493     if (!m_workerClientWrapper || !m_peer)
494         return ThreadableWebSocketChannel::SendFail;
495     // ArrayBuffer isn't thread-safe, hence the content of ArrayBuffer is copied into Vector<char>.
496     OwnPtr<Vector<char> > data = adoptPtr(new Vector<char>(byteLength));
497     if (binaryData.byteLength())
498         memcpy(data->data(), static_cast<const char*>(binaryData.data()) + byteOffset, byteLength);
499     setMethodNotCompleted();
500     m_loaderProxy.postTaskToLoader(createCallbackTask(&WorkerThreadableWebSocketChannel::mainThreadSendArrayBuffer, AllowCrossThreadAccess(m_peer), data.release()));
501     RefPtr<Bridge> protect(this);
502     waitForMethodCompletion();
503     ThreadableWebSocketChannelClientWrapper* clientWrapper = m_workerClientWrapper.get();
504     if (!clientWrapper)
505         return ThreadableWebSocketChannel::SendFail;
506     return clientWrapper->sendRequestResult();
507 }
508
509 ThreadableWebSocketChannel::SendResult WorkerThreadableWebSocketChannel::Bridge::send(const Blob& binaryData)
510 {
511     if (!m_workerClientWrapper || !m_peer)
512         return ThreadableWebSocketChannel::SendFail;
513     setMethodNotCompleted();
514     m_loaderProxy.postTaskToLoader(createCallbackTask(&WorkerThreadableWebSocketChannel::mainThreadSendBlob, AllowCrossThreadAccess(m_peer), binaryData.url(), binaryData.type(), binaryData.size()));
515     RefPtr<Bridge> protect(this);
516     waitForMethodCompletion();
517     ThreadableWebSocketChannelClientWrapper* clientWrapper = m_workerClientWrapper.get();
518     if (!clientWrapper)
519         return ThreadableWebSocketChannel::SendFail;
520     return clientWrapper->sendRequestResult();
521 }
522
523 void WorkerThreadableWebSocketChannel::mainThreadBufferedAmount(ScriptExecutionContext* context, Peer* peer)
524 {
525     ASSERT(isMainThread());
526     ASSERT_UNUSED(context, context->isDocument());
527     ASSERT(peer);
528
529     peer->bufferedAmount();
530 }
531
532 unsigned long WorkerThreadableWebSocketChannel::Bridge::bufferedAmount()
533 {
534     if (!m_workerClientWrapper || !m_peer)
535         return 0;
536     setMethodNotCompleted();
537     m_loaderProxy.postTaskToLoader(createCallbackTask(&WorkerThreadableWebSocketChannel::mainThreadBufferedAmount, AllowCrossThreadAccess(m_peer)));
538     RefPtr<Bridge> protect(this);
539     waitForMethodCompletion();
540     ThreadableWebSocketChannelClientWrapper* clientWrapper = m_workerClientWrapper.get();
541     if (clientWrapper)
542         return clientWrapper->bufferedAmount();
543     return 0;
544 }
545
546 void WorkerThreadableWebSocketChannel::mainThreadClose(ScriptExecutionContext* context, Peer* peer, int code, const String& reason)
547 {
548     ASSERT(isMainThread());
549     ASSERT_UNUSED(context, context->isDocument());
550     ASSERT(peer);
551
552     peer->close(code, reason);
553 }
554
555 void WorkerThreadableWebSocketChannel::Bridge::close(int code, const String& reason)
556 {
557     if (!m_peer)
558         return;
559     m_loaderProxy.postTaskToLoader(createCallbackTask(&WorkerThreadableWebSocketChannel::mainThreadClose, AllowCrossThreadAccess(m_peer), code, reason));
560 }
561
562 void WorkerThreadableWebSocketChannel::mainThreadFail(ScriptExecutionContext* context, Peer* peer, const String& reason)
563 {
564     ASSERT(isMainThread());
565     ASSERT_UNUSED(context, context->isDocument());
566     ASSERT(peer);
567
568     peer->fail(reason);
569 }
570
571 void WorkerThreadableWebSocketChannel::Bridge::fail(const String& reason)
572 {
573     if (!m_peer)
574         return;
575     m_loaderProxy.postTaskToLoader(createCallbackTask(&WorkerThreadableWebSocketChannel::mainThreadFail, AllowCrossThreadAccess(m_peer), reason));
576 }
577
578 void WorkerThreadableWebSocketChannel::mainThreadDestroy(ScriptExecutionContext* context, PassOwnPtr<Peer> peer)
579 {
580     ASSERT(isMainThread());
581     ASSERT_UNUSED(context, context->isDocument());
582     ASSERT_UNUSED(peer, peer);
583
584     // Peer object will be deleted even if the task does not run in the main thread's cleanup period, because
585     // the destructor for the task object (created by createCallbackTask()) will automatically delete the peer.
586 }
587
588 void WorkerThreadableWebSocketChannel::Bridge::disconnect()
589 {
590     clearClientWrapper();
591     if (m_peer) {
592         OwnPtr<Peer> peer = adoptPtr(m_peer);
593         m_peer = 0;
594         m_loaderProxy.postTaskToLoader(createCallbackTask(&WorkerThreadableWebSocketChannel::mainThreadDestroy, peer.release()));
595     }
596     m_workerContext = 0;
597 }
598
599 void WorkerThreadableWebSocketChannel::mainThreadSuspend(ScriptExecutionContext* context, Peer* peer)
600 {
601     ASSERT(isMainThread());
602     ASSERT_UNUSED(context, context->isDocument());
603     ASSERT(peer);
604
605     peer->suspend();
606 }
607
608 void WorkerThreadableWebSocketChannel::Bridge::suspend()
609 {
610     if (!m_peer)
611         return;
612     m_loaderProxy.postTaskToLoader(createCallbackTask(&WorkerThreadableWebSocketChannel::mainThreadSuspend, AllowCrossThreadAccess(m_peer)));
613 }
614
615 void WorkerThreadableWebSocketChannel::mainThreadResume(ScriptExecutionContext* context, Peer* peer)
616 {
617     ASSERT(isMainThread());
618     ASSERT_UNUSED(context, context->isDocument());
619     ASSERT(peer);
620
621     peer->resume();
622 }
623
624 void WorkerThreadableWebSocketChannel::Bridge::resume()
625 {
626     if (!m_peer)
627         return;
628     m_loaderProxy.postTaskToLoader(createCallbackTask(&WorkerThreadableWebSocketChannel::mainThreadResume, AllowCrossThreadAccess(m_peer)));
629 }
630
631 void WorkerThreadableWebSocketChannel::Bridge::clearClientWrapper()
632 {
633     m_workerClientWrapper->clearClient();
634 }
635
636 void WorkerThreadableWebSocketChannel::Bridge::setMethodNotCompleted()
637 {
638     ASSERT(m_workerClientWrapper);
639     m_workerClientWrapper->clearSyncMethodDone();
640 }
641
642 // Caller of this function should hold a reference to the bridge, because this function may call WebSocket::didClose() in the end,
643 // which causes the bridge to get disconnected from the WebSocket and deleted if there is no other reference.
644 void WorkerThreadableWebSocketChannel::Bridge::waitForMethodCompletion()
645 {
646     if (!m_workerContext)
647         return;
648     WorkerRunLoop& runLoop = m_workerContext->thread()->runLoop();
649     MessageQueueWaitResult result = MessageQueueMessageReceived;
650     ThreadableWebSocketChannelClientWrapper* clientWrapper = m_workerClientWrapper.get();
651     while (m_workerContext && clientWrapper && !clientWrapper->syncMethodDone() && result != MessageQueueTerminated) {
652         result = runLoop.runInMode(m_workerContext.get(), m_taskMode); // May cause this bridge to get disconnected, which makes m_workerContext become null.
653         clientWrapper = m_workerClientWrapper.get();
654     }
655 }
656
657 } // namespace WebCore
658
659 #endif // ENABLE(WEB_SOCKETS)