Replace WTF::move with WTFMove
[WebKit-https.git] / Source / WebCore / Modules / websockets / WorkerThreadableWebSocketChannel.cpp
1 /*
2  * Copyright (C) 2011, 2012 Google Inc.  All rights reserved.
3  *
4  * Redistribution and use in source and binary forms, with or without
5  * modification, are permitted provided that the following conditions are
6  * met:
7  *
8  *     * Redistributions of source code must retain the above copyright
9  * notice, this list of conditions and the following disclaimer.
10  *     * Redistributions in binary form must reproduce the above
11  * copyright notice, this list of conditions and the following disclaimer
12  * in the documentation and/or other materials provided with the
13  * distribution.
14  *     * Neither the name of Google Inc. nor the names of its
15  * contributors may be used to endorse or promote products derived from
16  * this software without specific prior written permission.
17  *
18  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
19  * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
20  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
21  * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
22  * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
23  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
24  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
25  * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
26  * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
27  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
28  * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
29  */
30
31 #include "config.h"
32
33 #if ENABLE(WEB_SOCKETS)
34
35 #include "WorkerThreadableWebSocketChannel.h"
36
37 #include "Blob.h"
38 #include "Document.h"
39 #include "ScriptExecutionContext.h"
40 #include "ThreadableWebSocketChannelClientWrapper.h"
41 #include "WebSocketChannel.h"
42 #include "WebSocketChannelClient.h"
43 #include "WorkerGlobalScope.h"
44 #include "WorkerLoaderProxy.h"
45 #include "WorkerRunLoop.h"
46 #include "WorkerThread.h"
47 #include <runtime/ArrayBuffer.h>
48 #include <wtf/MainThread.h>
49 #include <wtf/PassRefPtr.h>
50 #include <wtf/text/WTFString.h>
51
52 namespace WebCore {
53
54 WorkerThreadableWebSocketChannel::WorkerThreadableWebSocketChannel(WorkerGlobalScope* context, WebSocketChannelClient* client, const String& taskMode)
55     : m_workerGlobalScope(context)
56     , m_workerClientWrapper(ThreadableWebSocketChannelClientWrapper::create(context, client))
57     , m_bridge(Bridge::create(m_workerClientWrapper, m_workerGlobalScope, taskMode))
58 {
59     m_bridge->initialize();
60 }
61
62 WorkerThreadableWebSocketChannel::~WorkerThreadableWebSocketChannel()
63 {
64     if (m_bridge)
65         m_bridge->disconnect();
66 }
67
68 void WorkerThreadableWebSocketChannel::connect(const URL& url, const String& protocol)
69 {
70     if (m_bridge)
71         m_bridge->connect(url, protocol);
72 }
73
74 String WorkerThreadableWebSocketChannel::subprotocol()
75 {
76     ASSERT(m_workerClientWrapper);
77     return m_workerClientWrapper->subprotocol();
78 }
79
80 String WorkerThreadableWebSocketChannel::extensions()
81 {
82     ASSERT(m_workerClientWrapper);
83     return m_workerClientWrapper->extensions();
84 }
85
86 ThreadableWebSocketChannel::SendResult WorkerThreadableWebSocketChannel::send(const String& message)
87 {
88     if (!m_bridge)
89         return ThreadableWebSocketChannel::SendFail;
90     return m_bridge->send(message);
91 }
92
93 ThreadableWebSocketChannel::SendResult WorkerThreadableWebSocketChannel::send(const ArrayBuffer& binaryData, unsigned byteOffset, unsigned byteLength)
94 {
95     if (!m_bridge)
96         return ThreadableWebSocketChannel::SendFail;
97     return m_bridge->send(binaryData, byteOffset, byteLength);
98 }
99
100 ThreadableWebSocketChannel::SendResult WorkerThreadableWebSocketChannel::send(Blob& binaryData)
101 {
102     if (!m_bridge)
103         return ThreadableWebSocketChannel::SendFail;
104     return m_bridge->send(binaryData);
105 }
106
107 unsigned long WorkerThreadableWebSocketChannel::bufferedAmount() const
108 {
109     if (!m_bridge)
110         return 0;
111     return m_bridge->bufferedAmount();
112 }
113
114 void WorkerThreadableWebSocketChannel::close(int code, const String& reason)
115 {
116     if (m_bridge)
117         m_bridge->close(code, reason);
118 }
119
120 void WorkerThreadableWebSocketChannel::fail(const String& reason)
121 {
122     if (m_bridge)
123         m_bridge->fail(reason);
124 }
125
126 void WorkerThreadableWebSocketChannel::disconnect()
127 {
128     m_bridge->disconnect();
129     m_bridge = nullptr;
130 }
131
132 void WorkerThreadableWebSocketChannel::suspend()
133 {
134     m_workerClientWrapper->suspend();
135     if (m_bridge)
136         m_bridge->suspend();
137 }
138
139 void WorkerThreadableWebSocketChannel::resume()
140 {
141     m_workerClientWrapper->resume();
142     if (m_bridge)
143         m_bridge->resume();
144 }
145
146 WorkerThreadableWebSocketChannel::Peer::Peer(PassRefPtr<ThreadableWebSocketChannelClientWrapper> clientWrapper, WorkerLoaderProxy& loaderProxy, ScriptExecutionContext* context, const String& taskMode)
147     : m_workerClientWrapper(clientWrapper)
148     , m_loaderProxy(loaderProxy)
149     , m_mainWebSocketChannel(WebSocketChannel::create(downcast<Document>(context), this))
150     , m_taskMode(taskMode)
151 {
152     ASSERT(isMainThread());
153 }
154
155 WorkerThreadableWebSocketChannel::Peer::~Peer()
156 {
157     ASSERT(isMainThread());
158     if (m_mainWebSocketChannel)
159         m_mainWebSocketChannel->disconnect();
160 }
161
162 void WorkerThreadableWebSocketChannel::Peer::connect(const URL& url, const String& protocol)
163 {
164     ASSERT(isMainThread());
165     if (!m_mainWebSocketChannel)
166         return;
167     m_mainWebSocketChannel->connect(url, protocol);
168 }
169
170 void WorkerThreadableWebSocketChannel::Peer::send(const String& message)
171 {
172     ASSERT(isMainThread());
173     if (!m_mainWebSocketChannel || !m_workerClientWrapper)
174         return;
175
176     ThreadableWebSocketChannel::SendResult sendRequestResult = m_mainWebSocketChannel->send(message);
177     RefPtr<ThreadableWebSocketChannelClientWrapper> workerClientWrapper = m_workerClientWrapper;
178     m_loaderProxy.postTaskForModeToWorkerGlobalScope([workerClientWrapper, sendRequestResult] (ScriptExecutionContext&) {
179         workerClientWrapper->setSendRequestResult(sendRequestResult);
180     }, m_taskMode);
181 }
182
183 void WorkerThreadableWebSocketChannel::Peer::send(const ArrayBuffer& binaryData)
184 {
185     ASSERT(isMainThread());
186     if (!m_mainWebSocketChannel || !m_workerClientWrapper)
187         return;
188
189     ThreadableWebSocketChannel::SendResult sendRequestResult = m_mainWebSocketChannel->send(binaryData, 0, binaryData.byteLength());
190     RefPtr<ThreadableWebSocketChannelClientWrapper> workerClientWrapper = m_workerClientWrapper;
191     m_loaderProxy.postTaskForModeToWorkerGlobalScope([workerClientWrapper, sendRequestResult] (ScriptExecutionContext&) {
192         workerClientWrapper->setSendRequestResult(sendRequestResult);
193     }, m_taskMode);
194 }
195
196 void WorkerThreadableWebSocketChannel::Peer::send(Blob& binaryData)
197 {
198     ASSERT(isMainThread());
199     if (!m_mainWebSocketChannel || !m_workerClientWrapper)
200         return;
201
202     ThreadableWebSocketChannel::SendResult sendRequestResult = m_mainWebSocketChannel->send(binaryData);
203     RefPtr<ThreadableWebSocketChannelClientWrapper> workerClientWrapper = m_workerClientWrapper;
204     m_loaderProxy.postTaskForModeToWorkerGlobalScope([workerClientWrapper, sendRequestResult] (ScriptExecutionContext&) {
205         workerClientWrapper->setSendRequestResult(sendRequestResult);
206     }, m_taskMode);
207 }
208
209 void WorkerThreadableWebSocketChannel::Peer::bufferedAmount()
210 {
211     ASSERT(isMainThread());
212     if (!m_mainWebSocketChannel || !m_workerClientWrapper)
213         return;
214
215     RefPtr<ThreadableWebSocketChannelClientWrapper> workerClientWrapper = m_workerClientWrapper;
216     unsigned long bufferedAmount = m_mainWebSocketChannel->bufferedAmount();
217     m_loaderProxy.postTaskForModeToWorkerGlobalScope([workerClientWrapper, bufferedAmount] (ScriptExecutionContext& context) {
218         ASSERT_UNUSED(context, context.isWorkerGlobalScope());
219         workerClientWrapper->setBufferedAmount(bufferedAmount);
220     }, m_taskMode);
221 }
222
223 void WorkerThreadableWebSocketChannel::Peer::close(int code, const String& reason)
224 {
225     ASSERT(isMainThread());
226     if (!m_mainWebSocketChannel)
227         return;
228     m_mainWebSocketChannel->close(code, reason);
229 }
230
231 void WorkerThreadableWebSocketChannel::Peer::fail(const String& reason)
232 {
233     ASSERT(isMainThread());
234     if (!m_mainWebSocketChannel)
235         return;
236     m_mainWebSocketChannel->fail(reason);
237 }
238
239 void WorkerThreadableWebSocketChannel::Peer::disconnect()
240 {
241     ASSERT(isMainThread());
242     if (!m_mainWebSocketChannel)
243         return;
244     m_mainWebSocketChannel->disconnect();
245     m_mainWebSocketChannel = nullptr;
246 }
247
248 void WorkerThreadableWebSocketChannel::Peer::suspend()
249 {
250     ASSERT(isMainThread());
251     if (!m_mainWebSocketChannel)
252         return;
253     m_mainWebSocketChannel->suspend();
254 }
255
256 void WorkerThreadableWebSocketChannel::Peer::resume()
257 {
258     ASSERT(isMainThread());
259     if (!m_mainWebSocketChannel)
260         return;
261     m_mainWebSocketChannel->resume();
262 }
263
264 void WorkerThreadableWebSocketChannel::Peer::didConnect()
265 {
266     ASSERT(isMainThread());
267
268     RefPtr<ThreadableWebSocketChannelClientWrapper> workerClientWrapper = m_workerClientWrapper;
269     StringCapture capturedSubprotocol(m_mainWebSocketChannel->subprotocol());
270     StringCapture capturedExtensions(m_mainWebSocketChannel->extensions());
271     m_loaderProxy.postTaskForModeToWorkerGlobalScope([workerClientWrapper, capturedSubprotocol, capturedExtensions] (ScriptExecutionContext& context) {
272         ASSERT_UNUSED(context, context.isWorkerGlobalScope());
273         workerClientWrapper->setSubprotocol(capturedSubprotocol.string());
274         workerClientWrapper->setExtensions(capturedExtensions.string());
275         workerClientWrapper->didConnect();
276     }, m_taskMode);
277 }
278
279 void WorkerThreadableWebSocketChannel::Peer::didReceiveMessage(const String& message)
280 {
281     ASSERT(isMainThread());
282
283     RefPtr<ThreadableWebSocketChannelClientWrapper> workerClientWrapper = m_workerClientWrapper;
284     StringCapture capturedMessage(message);
285     m_loaderProxy.postTaskForModeToWorkerGlobalScope([workerClientWrapper, capturedMessage] (ScriptExecutionContext& context) {
286         ASSERT_UNUSED(context, context.isWorkerGlobalScope());
287         workerClientWrapper->didReceiveMessage(capturedMessage.string());
288     }, m_taskMode);
289 }
290
291 void WorkerThreadableWebSocketChannel::Peer::didReceiveBinaryData(Vector<char>&& binaryData)
292 {
293     ASSERT(isMainThread());
294
295     RefPtr<ThreadableWebSocketChannelClientWrapper> workerClientWrapper = m_workerClientWrapper;
296     Vector<char>* capturedData = new Vector<char>(WTFMove(binaryData));
297     m_loaderProxy.postTaskForModeToWorkerGlobalScope([workerClientWrapper, capturedData] (ScriptExecutionContext& context) {
298         ASSERT_UNUSED(context, context.isWorkerGlobalScope());
299         workerClientWrapper->didReceiveBinaryData(WTFMove(*capturedData));
300         delete capturedData;
301     }, m_taskMode);
302 }
303
304 void WorkerThreadableWebSocketChannel::Peer::didUpdateBufferedAmount(unsigned long bufferedAmount)
305 {
306     ASSERT(isMainThread());
307
308     RefPtr<ThreadableWebSocketChannelClientWrapper> workerClientWrapper = m_workerClientWrapper;
309     m_loaderProxy.postTaskForModeToWorkerGlobalScope([workerClientWrapper, bufferedAmount] (ScriptExecutionContext& context) {
310         ASSERT_UNUSED(context, context.isWorkerGlobalScope());
311         workerClientWrapper->didUpdateBufferedAmount(bufferedAmount);
312     }, m_taskMode);
313 }
314
315 void WorkerThreadableWebSocketChannel::Peer::didStartClosingHandshake()
316 {
317     ASSERT(isMainThread());
318
319     RefPtr<ThreadableWebSocketChannelClientWrapper> workerClientWrapper = m_workerClientWrapper;
320     m_loaderProxy.postTaskForModeToWorkerGlobalScope([workerClientWrapper] (ScriptExecutionContext& context) {
321         ASSERT_UNUSED(context, context.isWorkerGlobalScope());
322         workerClientWrapper->didStartClosingHandshake();
323     }, m_taskMode);
324 }
325
326 void WorkerThreadableWebSocketChannel::Peer::didClose(unsigned long unhandledBufferedAmount, ClosingHandshakeCompletionStatus closingHandshakeCompletion, unsigned short code, const String& reason)
327 {
328     ASSERT(isMainThread());
329     m_mainWebSocketChannel = nullptr;
330
331     RefPtr<ThreadableWebSocketChannelClientWrapper> workerClientWrapper = m_workerClientWrapper;
332     StringCapture capturedReason(reason);
333     m_loaderProxy.postTaskForModeToWorkerGlobalScope(
334         [workerClientWrapper, unhandledBufferedAmount, closingHandshakeCompletion, code, capturedReason] (ScriptExecutionContext& context) {
335             ASSERT_UNUSED(context, context.isWorkerGlobalScope());
336             workerClientWrapper->didClose(unhandledBufferedAmount, closingHandshakeCompletion, code, capturedReason.string());
337         }, m_taskMode);
338 }
339
340 void WorkerThreadableWebSocketChannel::Peer::didReceiveMessageError()
341 {
342     ASSERT(isMainThread());
343
344     RefPtr<ThreadableWebSocketChannelClientWrapper> workerClientWrapper = m_workerClientWrapper;
345     m_loaderProxy.postTaskForModeToWorkerGlobalScope([workerClientWrapper] (ScriptExecutionContext& context) {
346         ASSERT_UNUSED(context, context.isWorkerGlobalScope());
347         workerClientWrapper->didReceiveMessageError();
348     }, m_taskMode);
349 }
350
351 WorkerThreadableWebSocketChannel::Bridge::Bridge(PassRefPtr<ThreadableWebSocketChannelClientWrapper> workerClientWrapper, PassRefPtr<WorkerGlobalScope> workerGlobalScope, const String& taskMode)
352     : m_workerClientWrapper(workerClientWrapper)
353     , m_workerGlobalScope(workerGlobalScope)
354     , m_loaderProxy(m_workerGlobalScope->thread().workerLoaderProxy())
355     , m_taskMode(taskMode)
356     , m_peer(nullptr)
357 {
358     ASSERT(m_workerClientWrapper.get());
359 }
360
361 WorkerThreadableWebSocketChannel::Bridge::~Bridge()
362 {
363     disconnect();
364 }
365
366 void WorkerThreadableWebSocketChannel::Bridge::mainThreadInitialize(ScriptExecutionContext& context, WorkerLoaderProxy* loaderProxy, PassRefPtr<ThreadableWebSocketChannelClientWrapper> prpClientWrapper, const String& taskMode)
367 {
368     ASSERT(isMainThread());
369     ASSERT_UNUSED(context, context.isDocument());
370
371     RefPtr<ThreadableWebSocketChannelClientWrapper> clientWrapper = prpClientWrapper;
372
373     Peer* peerPtr = Peer::create(clientWrapper, *loaderProxy, &context, taskMode);
374     bool sent = loaderProxy->postTaskForModeToWorkerGlobalScope({
375         ScriptExecutionContext::Task::CleanupTask,
376         [clientWrapper, loaderProxy, peerPtr] (ScriptExecutionContext& context) {
377             ASSERT_UNUSED(context, context.isWorkerGlobalScope());
378             if (clientWrapper->failedWebSocketChannelCreation()) {
379                 // If Bridge::initialize() quitted earlier, we need to kick mainThreadDestroy() to delete the peer.
380                 loaderProxy->postTaskToLoader([peerPtr] (ScriptExecutionContext& context) {
381                     ASSERT(isMainThread());
382                     ASSERT_UNUSED(context, context.isDocument());
383                     delete peerPtr;
384                 });
385             } else
386                 clientWrapper->didCreateWebSocketChannel(peerPtr);
387         }
388     }, taskMode);
389
390     if (!sent) {
391         clientWrapper->clearPeer();
392         delete peerPtr;
393     }
394 }
395
396 void WorkerThreadableWebSocketChannel::Bridge::initialize()
397 {
398     ASSERT(!m_peer);
399     setMethodNotCompleted();
400     Ref<Bridge> protect(*this);
401
402     WorkerLoaderProxy* loaderProxy = &m_loaderProxy;
403     RefPtr<ThreadableWebSocketChannelClientWrapper> workerClientWrapper = m_workerClientWrapper;
404     StringCapture capturedTaskMode(m_taskMode);
405     m_loaderProxy.postTaskToLoader([loaderProxy, workerClientWrapper, capturedTaskMode] (ScriptExecutionContext& context) {
406         mainThreadInitialize(context, loaderProxy, workerClientWrapper, capturedTaskMode.string());
407     });
408     waitForMethodCompletion();
409
410     // m_peer may be null when the nested runloop exited before a peer has created.
411     m_peer = m_workerClientWrapper->peer();
412     if (!m_peer)
413         m_workerClientWrapper->setFailedWebSocketChannelCreation();
414 }
415
416 void WorkerThreadableWebSocketChannel::Bridge::connect(const URL& url, const String& protocol)
417 {
418     ASSERT(m_workerClientWrapper);
419     if (!m_peer)
420         return;
421
422     Peer* peer = m_peer;
423     URLCapture capturedURL(url);
424     StringCapture capturedProtocol(protocol);
425     m_loaderProxy.postTaskToLoader([peer, capturedURL, capturedProtocol] (ScriptExecutionContext& context) {
426         ASSERT(isMainThread());
427         ASSERT_UNUSED(context, context.isDocument());
428         ASSERT(peer);
429
430         peer->connect(capturedURL.url(), capturedProtocol.string());
431     });
432 }
433
434 ThreadableWebSocketChannel::SendResult WorkerThreadableWebSocketChannel::Bridge::send(const String& message)
435 {
436     if (!m_workerClientWrapper || !m_peer)
437         return ThreadableWebSocketChannel::SendFail;
438     setMethodNotCompleted();
439
440     Peer* peer = m_peer;
441     StringCapture capturedMessage(message);
442     m_loaderProxy.postTaskToLoader([peer, capturedMessage] (ScriptExecutionContext& context) {
443         ASSERT(isMainThread());
444         ASSERT_UNUSED(context, context.isDocument());
445         ASSERT(peer);
446
447         peer->send(capturedMessage.string());
448     });
449
450     Ref<Bridge> protect(*this);
451     waitForMethodCompletion();
452     ThreadableWebSocketChannelClientWrapper* clientWrapper = m_workerClientWrapper.get();
453     if (!clientWrapper)
454         return ThreadableWebSocketChannel::SendFail;
455     return clientWrapper->sendRequestResult();
456 }
457
458 ThreadableWebSocketChannel::SendResult WorkerThreadableWebSocketChannel::Bridge::send(const ArrayBuffer& binaryData, unsigned byteOffset, unsigned byteLength)
459 {
460     if (!m_workerClientWrapper || !m_peer)
461         return ThreadableWebSocketChannel::SendFail;
462
463     // ArrayBuffer isn't thread-safe, hence the content of ArrayBuffer is copied into Vector<char>.
464     Vector<char>* dataPtr = std::make_unique<Vector<char>>(byteLength).release();
465     if (binaryData.byteLength())
466         memcpy(dataPtr->data(), static_cast<const char*>(binaryData.data()) + byteOffset, byteLength);
467     setMethodNotCompleted();
468
469     Peer* peer = m_peer;
470     m_loaderProxy.postTaskToLoader([peer, dataPtr] (ScriptExecutionContext& context) {
471         ASSERT(isMainThread());
472         ASSERT_UNUSED(context, context.isDocument());
473         ASSERT(peer);
474
475         std::unique_ptr<Vector<char>> data(dataPtr);
476         RefPtr<ArrayBuffer> arrayBuffer = ArrayBuffer::create(data->data(), data->size());
477         peer->send(*arrayBuffer);
478     });
479
480     Ref<Bridge> protect(*this);
481     waitForMethodCompletion();
482     ThreadableWebSocketChannelClientWrapper* clientWrapper = m_workerClientWrapper.get();
483     if (!clientWrapper)
484         return ThreadableWebSocketChannel::SendFail;
485     return clientWrapper->sendRequestResult();
486 }
487
488 ThreadableWebSocketChannel::SendResult WorkerThreadableWebSocketChannel::Bridge::send(Blob& binaryData)
489 {
490     if (!m_workerClientWrapper || !m_peer)
491         return ThreadableWebSocketChannel::SendFail;
492     setMethodNotCompleted();
493
494     Peer* peer = m_peer;
495     URLCapture capturedURL(binaryData.url());
496     StringCapture capturedType(binaryData.type());
497     long long size = binaryData.size();
498     m_loaderProxy.postTaskToLoader([peer, capturedURL, capturedType, size] (ScriptExecutionContext& context) {
499         ASSERT(isMainThread());
500         ASSERT_UNUSED(context, context.isDocument());
501         ASSERT(peer);
502
503         peer->send(Blob::deserialize(capturedURL.url(), capturedType.string(), size));
504     });
505
506     Ref<Bridge> protect(*this);
507     waitForMethodCompletion();
508     ThreadableWebSocketChannelClientWrapper* clientWrapper = m_workerClientWrapper.get();
509     if (!clientWrapper)
510         return ThreadableWebSocketChannel::SendFail;
511     return clientWrapper->sendRequestResult();
512 }
513
514 unsigned long WorkerThreadableWebSocketChannel::Bridge::bufferedAmount()
515 {
516     if (!m_workerClientWrapper || !m_peer)
517         return 0;
518     setMethodNotCompleted();
519
520     Peer* peer = m_peer;
521     m_loaderProxy.postTaskToLoader([peer] (ScriptExecutionContext& context) {
522         ASSERT(isMainThread());
523         ASSERT_UNUSED(context, context.isDocument());
524         ASSERT(peer);
525
526         peer->bufferedAmount();
527     });
528
529     Ref<Bridge> protect(*this);
530     waitForMethodCompletion();
531     ThreadableWebSocketChannelClientWrapper* clientWrapper = m_workerClientWrapper.get();
532     if (clientWrapper)
533         return clientWrapper->bufferedAmount();
534     return 0;
535 }
536
537 void WorkerThreadableWebSocketChannel::Bridge::close(int code, const String& reason)
538 {
539     if (!m_peer)
540         return;
541
542     Peer* peer = m_peer;
543     StringCapture capturedReason(reason);
544     m_loaderProxy.postTaskToLoader([peer, code, capturedReason] (ScriptExecutionContext& context) {
545         ASSERT(isMainThread());
546         ASSERT_UNUSED(context, context.isDocument());
547         ASSERT(peer);
548
549         peer->close(code, capturedReason.string());
550     });
551 }
552
553 void WorkerThreadableWebSocketChannel::Bridge::fail(const String& reason)
554 {
555     if (!m_peer)
556         return;
557
558     Peer* peer = m_peer;
559     StringCapture capturedReason(reason);
560     m_loaderProxy.postTaskToLoader([peer, capturedReason] (ScriptExecutionContext& context) {
561         ASSERT(isMainThread());
562         ASSERT_UNUSED(context, context.isDocument());
563         ASSERT(peer);
564
565         peer->fail(capturedReason.string());
566     });
567 }
568
569 void WorkerThreadableWebSocketChannel::Bridge::disconnect()
570 {
571     clearClientWrapper();
572     if (m_peer) {
573         Peer* peer = m_peer;
574         m_loaderProxy.postTaskToLoader([peer] (ScriptExecutionContext& context) {
575             ASSERT(isMainThread());
576             ASSERT_UNUSED(context, context.isDocument());
577             delete peer;
578         });
579         m_peer = nullptr;
580     }
581     m_workerGlobalScope = nullptr;
582 }
583
584 void WorkerThreadableWebSocketChannel::Bridge::suspend()
585 {
586     if (!m_peer)
587         return;
588
589     Peer* peer = m_peer;
590     m_loaderProxy.postTaskToLoader([peer] (ScriptExecutionContext& context) {
591         ASSERT(isMainThread());
592         ASSERT_UNUSED(context, context.isDocument());
593         ASSERT(peer);
594
595         peer->suspend();
596     });
597 }
598
599 void WorkerThreadableWebSocketChannel::Bridge::resume()
600 {
601     if (!m_peer)
602         return;
603
604     Peer* peer = m_peer;
605     m_loaderProxy.postTaskToLoader([peer] (ScriptExecutionContext& context) {
606         ASSERT(isMainThread());
607         ASSERT_UNUSED(context, context.isDocument());
608         ASSERT(peer);
609
610         peer->resume();
611     });
612 }
613
614 void WorkerThreadableWebSocketChannel::Bridge::clearClientWrapper()
615 {
616     m_workerClientWrapper->clearClient();
617 }
618
619 void WorkerThreadableWebSocketChannel::Bridge::setMethodNotCompleted()
620 {
621     ASSERT(m_workerClientWrapper);
622     m_workerClientWrapper->clearSyncMethodDone();
623 }
624
625 // Caller of this function should hold a reference to the bridge, because this function may call WebSocket::didClose() in the end,
626 // which causes the bridge to get disconnected from the WebSocket and deleted if there is no other reference.
627 void WorkerThreadableWebSocketChannel::Bridge::waitForMethodCompletion()
628 {
629     if (!m_workerGlobalScope)
630         return;
631     WorkerRunLoop& runLoop = m_workerGlobalScope->thread().runLoop();
632     MessageQueueWaitResult result = MessageQueueMessageReceived;
633     ThreadableWebSocketChannelClientWrapper* clientWrapper = m_workerClientWrapper.get();
634     while (m_workerGlobalScope && clientWrapper && !clientWrapper->syncMethodDone() && result != MessageQueueTerminated) {
635         result = runLoop.runInMode(m_workerGlobalScope.get(), m_taskMode); // May cause this bridge to get disconnected, which makes m_workerGlobalScope become null.
636         clientWrapper = m_workerClientWrapper.get();
637     }
638 }
639
640 } // namespace WebCore
641
642 #endif // ENABLE(WEB_SOCKETS)