[Streams API] ReadableJSStream should handle promises returned by JS source start...
[WebKit-https.git] / Source / WebCore / Modules / streams / ReadableStream.cpp
1 /*
2  * Copyright (C) 2015 Canon Inc.
3  * Copyright (C) 2015 Igalia S.L.
4  *
5  * Redistribution and use in source and binary forms, with or without
6  * modification, are permitted, provided that the following conditions
7  * are required to be met:
8  *
9  * 1.  Redistributions of source code must retain the above copyright
10  *     notice, this list of conditions and the following disclaimer.
11  * 2.  Redistributions in binary form must reproduce the above copyright
12  *     notice, this list of conditions and the following disclaimer in the
13  *     documentation and/or other materials provided with the distribution.
14  * 3.  Neither the name of Canon Inc. nor the names of
15  *     its contributors may be used to endorse or promote products derived
16  *     from this software without specific prior written permission.
17  *
18  * THIS SOFTWARE IS PROVIDED BY CANON INC. AND ITS CONTRIBUTORS "AS IS" AND ANY
19  * EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
20  * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
21  * DISCLAIMED. IN NO EVENT SHALL CANON INC. AND ITS CONTRIBUTORS BE LIABLE FOR
22  * ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
23  * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
24  * SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
25  * CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
26  * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
27  * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
28  */
29
30 #include "config.h"
31 #include "ReadableStream.h"
32
33 #if ENABLE(STREAMS_API)
34
35 #include "ReadableStreamReader.h"
36 #include <runtime/JSCJSValueInlines.h>
37 #include <wtf/RefCountedLeakCounter.h>
38
39 namespace WebCore {
40
41 DEFINE_DEBUG_ONLY_GLOBAL(WTF::RefCountedLeakCounter, readableStreamCounter, ("ReadableStream"));
42
43 ReadableStream::ReadableStream(ScriptExecutionContext& scriptExecutionContext)
44     : ActiveDOMObject(&scriptExecutionContext)
45 {
46 #ifndef NDEBUG
47     readableStreamCounter.increment();
48 #endif
49     suspendIfNeeded();
50 }
51
52 ReadableStream::~ReadableStream()
53 {
54 #ifndef NDEBUG
55     readableStreamCounter.decrement();
56 #endif
57 }
58
59 void ReadableStream::clearCallbacks()
60 {
61     m_closedSuccessCallback = nullptr;
62     m_closedFailureCallback = nullptr;
63
64     m_readRequests.clear();
65 }
66
67 void ReadableStream::changeStateToClosed()
68 {
69     ASSERT(!m_closeRequested);
70     ASSERT(m_state != State::Errored);
71
72     m_closeRequested = true;
73
74     if (m_state != State::Readable || hasValue())
75         return;
76     close();
77 }
78
79 void ReadableStream::close()
80 {
81     m_state = State::Closed;
82
83     if (m_reader)
84         m_releasedReaders.append(WTF::move(m_reader));
85
86     if (m_closedSuccessCallback)
87         m_closedSuccessCallback();
88
89     for (auto& request : m_readRequests)
90         request.endCallback();
91
92     clearCallbacks();
93 }
94
95 void ReadableStream::changeStateToErrored()
96 {
97     if (m_state != State::Readable)
98         return;
99     m_state = State::Errored;
100
101     if (m_reader)
102         m_releasedReaders.append(WTF::move(m_reader));
103
104     JSC::JSValue error = this->error();
105     if (m_closedFailureCallback)
106         m_closedFailureCallback(error);
107
108     for (auto& request : m_readRequests)
109         request.failureCallback(error);
110
111     clearCallbacks();
112 }
113
114 void ReadableStream::start()
115 {
116     m_isStarted = true;
117     pull();
118 }
119
120 void ReadableStream::pull()
121 {
122     if (!m_isStarted || m_state == State::Closed || m_state == State::Errored || m_closeRequested)
123         return;
124     // FIXME: Implement queueSize check.
125     if (m_readRequests.isEmpty() && hasValue())
126         return;
127     // FIXME: Implement async pull check.
128     doPull();
129 }
130
131 ReadableStreamReader& ReadableStream::getReader()
132 {
133     ASSERT(!m_reader);
134
135     std::unique_ptr<ReadableStreamReader> newReader = std::make_unique<ReadableStreamReader>(*this);
136     ReadableStreamReader& reader = *newReader.get();
137
138     if (m_state == State::Readable) {
139         m_reader = WTF::move(newReader);
140         return reader;
141     }
142
143     m_releasedReaders.append(WTF::move(newReader));
144     return reader;
145 }
146
147 void ReadableStream::closed(ClosedSuccessCallback&& successCallback, FailureCallback&& failureCallback)
148 {
149     if (m_state == State::Closed) {
150         successCallback();
151         return;
152     }
153     if (m_state == State::Errored) {
154         failureCallback(error());
155         return;
156     }
157     m_closedSuccessCallback = WTF::move(successCallback);
158     m_closedFailureCallback = WTF::move(failureCallback);
159 }
160
161 void ReadableStream::read(ReadSuccessCallback&& successCallback, ReadEndCallback&& endCallback, FailureCallback&& failureCallback)
162 {
163     if (m_state == State::Closed) {
164         endCallback();
165         return;
166     }
167     if (m_state == State::Errored) {
168         failureCallback(error());
169         return;
170     }
171     if (hasValue()) {
172         successCallback(read());
173         if (!m_closeRequested)
174             pull();
175         else if (!hasValue())
176             close();
177         return;
178     }
179     m_readRequests.append({ WTF::move(successCallback), WTF::move(endCallback), WTF::move(failureCallback) });
180     pull();
181 }
182
183 bool ReadableStream::resolveReadCallback(JSC::JSValue value)
184 {
185     if (m_readRequests.isEmpty())
186         return false;
187
188     m_readRequests.takeFirst().successCallback(value);
189     return true;
190 }
191
192 const char* ReadableStream::activeDOMObjectName() const
193 {
194     return "ReadableStream";
195 }
196
197 bool ReadableStream::canSuspendForPageCache() const
198 {
199     // FIXME: We should try and do better here.
200     return false;
201 }
202
203 }
204
205 #endif