WebSocket: Add binaryType attribute
[WebKit-https.git] / Source / WebCore / websockets / WorkerThreadableWebSocketChannel.cpp
1 /*
2  * Copyright (C) 2009, 2010 Google Inc.  All rights reserved.
3  *
4  * Redistribution and use in source and binary forms, with or without
5  * modification, are permitted provided that the following conditions are
6  * met:
7  *
8  *     * Redistributions of source code must retain the above copyright
9  * notice, this list of conditions and the following disclaimer.
10  *     * Redistributions in binary form must reproduce the above
11  * copyright notice, this list of conditions and the following disclaimer
12  * in the documentation and/or other materials provided with the
13  * distribution.
14  *     * Neither the name of Google Inc. nor the names of its
15  * contributors may be used to endorse or promote products derived from
16  * this software without specific prior written permission.
17  *
18  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
19  * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
20  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
21  * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
22  * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
23  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
24  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
25  * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
26  * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
27  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
28  * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
29  */
30
31 #include "config.h"
32
33 #if ENABLE(WEB_SOCKETS) && ENABLE(WORKERS)
34
35 #include "WorkerThreadableWebSocketChannel.h"
36
37 #include "CrossThreadTask.h"
38 #include "PlatformString.h"
39 #include "ScriptExecutionContext.h"
40 #include "ThreadableWebSocketChannelClientWrapper.h"
41 #include "WebSocketChannel.h"
42 #include "WebSocketChannelClient.h"
43 #include "WorkerContext.h"
44 #include "WorkerLoaderProxy.h"
45 #include "WorkerRunLoop.h"
46 #include "WorkerThread.h"
47 #include <wtf/MainThread.h>
48 #include <wtf/PassRefPtr.h>
49
50 namespace WebCore {
51
52 WorkerThreadableWebSocketChannel::WorkerThreadableWebSocketChannel(WorkerContext* context, WebSocketChannelClient* client, const String& taskMode)
53     : m_workerContext(context)
54     , m_workerClientWrapper(ThreadableWebSocketChannelClientWrapper::create(client))
55     , m_bridge(Bridge::create(m_workerClientWrapper, m_workerContext, taskMode))
56 {
57 }
58
59 WorkerThreadableWebSocketChannel::~WorkerThreadableWebSocketChannel()
60 {
61     if (m_bridge)
62         m_bridge->disconnect();
63 }
64
65 bool WorkerThreadableWebSocketChannel::useHixie76Protocol()
66 {
67     ASSERT(m_workerClientWrapper);
68     return m_workerClientWrapper->useHixie76Protocol();
69 }
70
71 void WorkerThreadableWebSocketChannel::connect(const KURL& url, const String& protocol)
72 {
73     if (m_bridge)
74         m_bridge->connect(url, protocol);
75 }
76
77 bool WorkerThreadableWebSocketChannel::send(const String& message)
78 {
79     if (!m_bridge)
80         return false;
81     return m_bridge->send(message);
82 }
83
84 unsigned long WorkerThreadableWebSocketChannel::bufferedAmount() const
85 {
86     if (!m_bridge)
87         return 0;
88     return m_bridge->bufferedAmount();
89 }
90
91 void WorkerThreadableWebSocketChannel::close()
92 {
93     if (m_bridge)
94         m_bridge->close();
95 }
96
97 void WorkerThreadableWebSocketChannel::fail(const String& reason)
98 {
99     if (m_bridge)
100         m_bridge->fail(reason);
101 }
102
103 void WorkerThreadableWebSocketChannel::disconnect()
104 {
105     m_bridge->disconnect();
106     m_bridge.clear();
107 }
108
109 void WorkerThreadableWebSocketChannel::suspend()
110 {
111     m_workerClientWrapper->suspend();
112     if (m_bridge)
113         m_bridge->suspend();
114 }
115
116 void WorkerThreadableWebSocketChannel::resume()
117 {
118     m_workerClientWrapper->resume();
119     if (m_bridge)
120         m_bridge->resume();
121 }
122
123 WorkerThreadableWebSocketChannel::Peer::Peer(PassRefPtr<ThreadableWebSocketChannelClientWrapper> clientWrapper, WorkerLoaderProxy& loaderProxy, ScriptExecutionContext* context, const String& taskMode)
124     : m_workerClientWrapper(clientWrapper)
125     , m_loaderProxy(loaderProxy)
126     , m_mainWebSocketChannel(WebSocketChannel::create(context, this))
127     , m_taskMode(taskMode)
128 {
129     ASSERT(isMainThread());
130 }
131
132 WorkerThreadableWebSocketChannel::Peer::~Peer()
133 {
134     ASSERT(isMainThread());
135     if (m_mainWebSocketChannel)
136         m_mainWebSocketChannel->disconnect();
137 }
138
139 bool WorkerThreadableWebSocketChannel::Peer::useHixie76Protocol()
140 {
141     ASSERT(isMainThread());
142     ASSERT(m_mainWebSocketChannel);
143     return m_mainWebSocketChannel->useHixie76Protocol();
144 }
145
146 void WorkerThreadableWebSocketChannel::Peer::connect(const KURL& url, const String& protocol)
147 {
148     ASSERT(isMainThread());
149     if (!m_mainWebSocketChannel)
150         return;
151     m_mainWebSocketChannel->connect(url, protocol);
152 }
153
154 static void workerContextDidSend(ScriptExecutionContext* context, RefPtr<ThreadableWebSocketChannelClientWrapper> workerClientWrapper, bool sent)
155 {
156     ASSERT_UNUSED(context, context->isWorkerContext());
157     workerClientWrapper->setSent(sent);
158 }
159
160 void WorkerThreadableWebSocketChannel::Peer::send(const String& message)
161 {
162     ASSERT(isMainThread());
163     if (!m_mainWebSocketChannel || !m_workerClientWrapper)
164         return;
165     bool sent = m_mainWebSocketChannel->send(message);
166     m_loaderProxy.postTaskForModeToWorkerContext(createCallbackTask(&workerContextDidSend, m_workerClientWrapper, sent), m_taskMode);
167 }
168
169 static void workerContextDidGetBufferedAmount(ScriptExecutionContext* context, RefPtr<ThreadableWebSocketChannelClientWrapper> workerClientWrapper, unsigned long bufferedAmount)
170 {
171     ASSERT_UNUSED(context, context->isWorkerContext());
172     workerClientWrapper->setBufferedAmount(bufferedAmount);
173 }
174
175 void WorkerThreadableWebSocketChannel::Peer::bufferedAmount()
176 {
177     ASSERT(isMainThread());
178     if (!m_mainWebSocketChannel || !m_workerClientWrapper)
179         return;
180     unsigned long bufferedAmount = m_mainWebSocketChannel->bufferedAmount();
181     m_loaderProxy.postTaskForModeToWorkerContext(createCallbackTask(&workerContextDidGetBufferedAmount, m_workerClientWrapper, bufferedAmount), m_taskMode);
182 }
183
184 void WorkerThreadableWebSocketChannel::Peer::close()
185 {
186     ASSERT(isMainThread());
187     if (!m_mainWebSocketChannel)
188         return;
189     m_mainWebSocketChannel->close();
190 }
191
192 void WorkerThreadableWebSocketChannel::Peer::fail(const String& reason)
193 {
194     ASSERT(isMainThread());
195     if (!m_mainWebSocketChannel)
196         return;
197     m_mainWebSocketChannel->fail(reason);
198 }
199
200 void WorkerThreadableWebSocketChannel::Peer::disconnect()
201 {
202     ASSERT(isMainThread());
203     if (!m_mainWebSocketChannel)
204         return;
205     m_mainWebSocketChannel->disconnect();
206     m_mainWebSocketChannel = 0;
207 }
208
209 void WorkerThreadableWebSocketChannel::Peer::suspend()
210 {
211     ASSERT(isMainThread());
212     if (!m_mainWebSocketChannel)
213         return;
214     m_mainWebSocketChannel->suspend();
215 }
216
217 void WorkerThreadableWebSocketChannel::Peer::resume()
218 {
219     ASSERT(isMainThread());
220     if (!m_mainWebSocketChannel)
221         return;
222     m_mainWebSocketChannel->resume();
223 }
224
225 static void workerContextDidConnect(ScriptExecutionContext* context, RefPtr<ThreadableWebSocketChannelClientWrapper> workerClientWrapper)
226 {
227     ASSERT_UNUSED(context, context->isWorkerContext());
228     workerClientWrapper->didConnect();
229 }
230
231 void WorkerThreadableWebSocketChannel::Peer::didConnect()
232 {
233     ASSERT(isMainThread());
234     m_loaderProxy.postTaskForModeToWorkerContext(createCallbackTask(&workerContextDidConnect, m_workerClientWrapper), m_taskMode);
235 }
236
237 static void workerContextDidReceiveMessage(ScriptExecutionContext* context, RefPtr<ThreadableWebSocketChannelClientWrapper> workerClientWrapper, const String& message)
238 {
239     ASSERT_UNUSED(context, context->isWorkerContext());
240     workerClientWrapper->didReceiveMessage(message);
241 }
242
243 void WorkerThreadableWebSocketChannel::Peer::didReceiveMessage(const String& message)
244 {
245     ASSERT(isMainThread());
246     m_loaderProxy.postTaskForModeToWorkerContext(createCallbackTask(&workerContextDidReceiveMessage, m_workerClientWrapper, message), m_taskMode);
247 }
248
249 static void workerContextDidStartClosingHandshake(ScriptExecutionContext* context, RefPtr<ThreadableWebSocketChannelClientWrapper> workerClientWrapper)
250 {
251     ASSERT_UNUSED(context, context->isWorkerContext());
252     workerClientWrapper->didStartClosingHandshake();
253 }
254
255 void WorkerThreadableWebSocketChannel::Peer::didStartClosingHandshake()
256 {
257     ASSERT(isMainThread());
258     m_loaderProxy.postTaskForModeToWorkerContext(createCallbackTask(&workerContextDidStartClosingHandshake, m_workerClientWrapper), m_taskMode);
259 }
260
261 static void workerContextDidClose(ScriptExecutionContext* context, RefPtr<ThreadableWebSocketChannelClientWrapper> workerClientWrapper, unsigned long unhandledBufferedAmount, WebSocketChannelClient::ClosingHandshakeCompletionStatus closingHandshakeCompletion)
262 {
263     ASSERT_UNUSED(context, context->isWorkerContext());
264     workerClientWrapper->didClose(unhandledBufferedAmount, closingHandshakeCompletion);
265 }
266
267 void WorkerThreadableWebSocketChannel::Peer::didClose(unsigned long unhandledBufferedAmount, ClosingHandshakeCompletionStatus closingHandshakeCompletion)
268 {
269     ASSERT(isMainThread());
270     m_mainWebSocketChannel = 0;
271     m_loaderProxy.postTaskForModeToWorkerContext(createCallbackTask(&workerContextDidClose, m_workerClientWrapper, unhandledBufferedAmount, closingHandshakeCompletion), m_taskMode);
272 }
273
274 void WorkerThreadableWebSocketChannel::Bridge::setWebSocketChannel(ScriptExecutionContext* context, Bridge* thisPtr, Peer* peer, PassRefPtr<ThreadableWebSocketChannelClientWrapper> workerClientWrapper, bool useHixie76Protocol)
275 {
276     ASSERT_UNUSED(context, context->isWorkerContext());
277     thisPtr->m_peer = peer;
278     workerClientWrapper->setUseHixie76Protocol(useHixie76Protocol);
279     workerClientWrapper->setSyncMethodDone();
280 }
281
282 void WorkerThreadableWebSocketChannel::Bridge::mainThreadCreateWebSocketChannel(ScriptExecutionContext* context, Bridge* thisPtr, PassRefPtr<ThreadableWebSocketChannelClientWrapper> prpClientWrapper, const String& taskMode)
283 {
284     ASSERT(isMainThread());
285     ASSERT_UNUSED(context, context->isDocument());
286
287     RefPtr<ThreadableWebSocketChannelClientWrapper> clientWrapper = prpClientWrapper;
288
289     Peer* peer = Peer::create(clientWrapper, thisPtr->m_loaderProxy, context, taskMode);
290     thisPtr->m_loaderProxy.postTaskForModeToWorkerContext(
291         createCallbackTask(&Bridge::setWebSocketChannel,
292                            AllowCrossThreadAccess(thisPtr),
293                            AllowCrossThreadAccess(peer), clientWrapper, peer->useHixie76Protocol()), taskMode);
294 }
295
296 WorkerThreadableWebSocketChannel::Bridge::Bridge(PassRefPtr<ThreadableWebSocketChannelClientWrapper> workerClientWrapper, PassRefPtr<WorkerContext> workerContext, const String& taskMode)
297     : m_workerClientWrapper(workerClientWrapper)
298     , m_workerContext(workerContext)
299     , m_loaderProxy(m_workerContext->thread()->workerLoaderProxy())
300     , m_taskMode(taskMode)
301     , m_peer(0)
302 {
303     ASSERT(m_workerClientWrapper.get());
304     setMethodNotCompleted();
305     m_loaderProxy.postTaskToLoader(
306         createCallbackTask(&Bridge::mainThreadCreateWebSocketChannel,
307                            AllowCrossThreadAccess(this), m_workerClientWrapper, m_taskMode));
308     waitForMethodCompletion();
309     ASSERT(m_peer);
310 }
311
312 WorkerThreadableWebSocketChannel::Bridge::~Bridge()
313 {
314     disconnect();
315 }
316
317 void WorkerThreadableWebSocketChannel::mainThreadConnect(ScriptExecutionContext* context, Peer* peer, const KURL& url, const String& protocol)
318 {
319     ASSERT(isMainThread());
320     ASSERT_UNUSED(context, context->isDocument());
321     ASSERT(peer);
322
323     peer->connect(url, protocol);
324 }
325
326 void WorkerThreadableWebSocketChannel::Bridge::connect(const KURL& url, const String& protocol)
327 {
328     ASSERT(m_workerClientWrapper);
329     ASSERT(m_peer);
330     m_loaderProxy.postTaskToLoader(createCallbackTask(&WorkerThreadableWebSocketChannel::mainThreadConnect, AllowCrossThreadAccess(m_peer), url, protocol));
331 }
332
333 void WorkerThreadableWebSocketChannel::mainThreadSend(ScriptExecutionContext* context, Peer* peer, const String& message)
334 {
335     ASSERT(isMainThread());
336     ASSERT_UNUSED(context, context->isDocument());
337     ASSERT(peer);
338
339     peer->send(message);
340 }
341
342 bool WorkerThreadableWebSocketChannel::Bridge::send(const String& message)
343 {
344     if (!m_workerClientWrapper)
345         return false;
346     ASSERT(m_peer);
347     setMethodNotCompleted();
348     m_loaderProxy.postTaskToLoader(createCallbackTask(&WorkerThreadableWebSocketChannel::mainThreadSend, AllowCrossThreadAccess(m_peer), message));
349     RefPtr<Bridge> protect(this);
350     waitForMethodCompletion();
351     ThreadableWebSocketChannelClientWrapper* clientWrapper = m_workerClientWrapper.get();
352     return clientWrapper && clientWrapper->sent();
353 }
354
355 void WorkerThreadableWebSocketChannel::mainThreadBufferedAmount(ScriptExecutionContext* context, Peer* peer)
356 {
357     ASSERT(isMainThread());
358     ASSERT_UNUSED(context, context->isDocument());
359     ASSERT(peer);
360
361     peer->bufferedAmount();
362 }
363
364 unsigned long WorkerThreadableWebSocketChannel::Bridge::bufferedAmount()
365 {
366     if (!m_workerClientWrapper)
367         return 0;
368     ASSERT(m_peer);
369     setMethodNotCompleted();
370     m_loaderProxy.postTaskToLoader(createCallbackTask(&WorkerThreadableWebSocketChannel::mainThreadBufferedAmount, AllowCrossThreadAccess(m_peer)));
371     RefPtr<Bridge> protect(this);
372     waitForMethodCompletion();
373     ThreadableWebSocketChannelClientWrapper* clientWrapper = m_workerClientWrapper.get();
374     if (clientWrapper)
375         return clientWrapper->bufferedAmount();
376     return 0;
377 }
378
379 void WorkerThreadableWebSocketChannel::mainThreadClose(ScriptExecutionContext* context, Peer* peer)
380 {
381     ASSERT(isMainThread());
382     ASSERT_UNUSED(context, context->isDocument());
383     ASSERT(peer);
384
385     peer->close();
386 }
387
388 void WorkerThreadableWebSocketChannel::Bridge::close()
389 {
390     ASSERT(m_peer);
391     m_loaderProxy.postTaskToLoader(createCallbackTask(&WorkerThreadableWebSocketChannel::mainThreadClose, AllowCrossThreadAccess(m_peer)));
392 }
393
394 void WorkerThreadableWebSocketChannel::mainThreadFail(ScriptExecutionContext* context, Peer* peer, const String& reason)
395 {
396     ASSERT(isMainThread());
397     ASSERT_UNUSED(context, context->isDocument());
398     ASSERT(peer);
399
400     peer->fail(reason);
401 }
402
403 void WorkerThreadableWebSocketChannel::Bridge::fail(const String& reason)
404 {
405     ASSERT(m_peer);
406     m_loaderProxy.postTaskToLoader(createCallbackTask(&WorkerThreadableWebSocketChannel::mainThreadFail, AllowCrossThreadAccess(m_peer), reason));
407 }
408
409 void WorkerThreadableWebSocketChannel::mainThreadDestroy(ScriptExecutionContext* context, Peer* peer)
410 {
411     ASSERT(isMainThread());
412     ASSERT_UNUSED(context, context->isDocument());
413     ASSERT(peer);
414
415     delete peer;
416 }
417
418 void WorkerThreadableWebSocketChannel::Bridge::disconnect()
419 {
420     clearClientWrapper();
421     if (m_peer) {
422         Peer* peer = m_peer;
423         m_peer = 0;
424         m_loaderProxy.postTaskToLoader(createCallbackTask(&mainThreadDestroy, AllowCrossThreadAccess(peer)));
425     }
426     m_workerContext = 0;
427 }
428
429 void WorkerThreadableWebSocketChannel::mainThreadSuspend(ScriptExecutionContext* context, Peer* peer)
430 {
431     ASSERT(isMainThread());
432     ASSERT_UNUSED(context, context->isDocument());
433     ASSERT(peer);
434
435     peer->suspend();
436 }
437
438 void WorkerThreadableWebSocketChannel::Bridge::suspend()
439 {
440     ASSERT(m_peer);
441     m_loaderProxy.postTaskToLoader(createCallbackTask(&WorkerThreadableWebSocketChannel::mainThreadSuspend, AllowCrossThreadAccess(m_peer)));
442 }
443
444 void WorkerThreadableWebSocketChannel::mainThreadResume(ScriptExecutionContext* context, Peer* peer)
445 {
446     ASSERT(isMainThread());
447     ASSERT_UNUSED(context, context->isDocument());
448     ASSERT(peer);
449
450     peer->resume();
451 }
452
453 void WorkerThreadableWebSocketChannel::Bridge::resume()
454 {
455     ASSERT(m_peer);
456     m_loaderProxy.postTaskToLoader(createCallbackTask(&WorkerThreadableWebSocketChannel::mainThreadResume, AllowCrossThreadAccess(m_peer)));
457 }
458
459 void WorkerThreadableWebSocketChannel::Bridge::clearClientWrapper()
460 {
461     m_workerClientWrapper->clearClient();
462 }
463
464 void WorkerThreadableWebSocketChannel::Bridge::setMethodNotCompleted()
465 {
466     ASSERT(m_workerClientWrapper);
467     m_workerClientWrapper->clearSyncMethodDone();
468 }
469
470 // Caller of this function should hold a reference to the bridge, because this function may call WebSocket::didClose() in the end,
471 // which causes the bridge to get disconnected from the WebSocket and deleted if there is no other reference.
472 void WorkerThreadableWebSocketChannel::Bridge::waitForMethodCompletion()
473 {
474     if (!m_workerContext)
475         return;
476     WorkerRunLoop& runLoop = m_workerContext->thread()->runLoop();
477     MessageQueueWaitResult result = MessageQueueMessageReceived;
478     ThreadableWebSocketChannelClientWrapper* clientWrapper = m_workerClientWrapper.get();
479     while (m_workerContext && clientWrapper && !clientWrapper->syncMethodDone() && result != MessageQueueTerminated) {
480         result = runLoop.runInMode(m_workerContext.get(), m_taskMode); // May cause this bridge to get disconnected, which makes m_workerContext become null.
481         clientWrapper = m_workerClientWrapper.get();
482     }
483 }
484
485 } // namespace WebCore
486
487 #endif // ENABLE(WEB_SOCKETS)