[Streams API] Implement pulling of a source by a ReadableStream
[WebKit-https.git] / Source / WebCore / Modules / streams / ReadableStream.cpp
1 /*
2  * Copyright (C) 2015 Canon Inc.
3  * Copyright (C) 2015 Igalia S.L.
4  *
5  * Redistribution and use in source and binary forms, with or without
6  * modification, are permitted, provided that the following conditions
7  * are required to be met:
8  *
9  * 1.  Redistributions of source code must retain the above copyright
10  *     notice, this list of conditions and the following disclaimer.
11  * 2.  Redistributions in binary form must reproduce the above copyright
12  *     notice, this list of conditions and the following disclaimer in the
13  *     documentation and/or other materials provided with the distribution.
14  * 3.  Neither the name of Canon Inc. nor the names of
15  *     its contributors may be used to endorse or promote products derived
16  *     from this software without specific prior written permission.
17  *
18  * THIS SOFTWARE IS PROVIDED BY CANON INC. AND ITS CONTRIBUTORS "AS IS" AND ANY
19  * EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
20  * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
21  * DISCLAIMED. IN NO EVENT SHALL CANON INC. AND ITS CONTRIBUTORS BE LIABLE FOR
22  * ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
23  * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
24  * SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
25  * CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
26  * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
27  * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
28  */
29
30 #include "config.h"
31 #include "ReadableStream.h"
32
33 #if ENABLE(STREAMS_API)
34
35 #include "NotImplemented.h"
36 #include "ReadableStreamReader.h"
37 #include <runtime/JSCJSValueInlines.h>
38 #include <wtf/RefCountedLeakCounter.h>
39
40 namespace WebCore {
41
42 DEFINE_DEBUG_ONLY_GLOBAL(WTF::RefCountedLeakCounter, readableStreamCounter, ("ReadableStream"));
43
44 ReadableStream::ReadableStream(ScriptExecutionContext& scriptExecutionContext)
45     : ActiveDOMObject(&scriptExecutionContext)
46 {
47 #ifndef NDEBUG
48     readableStreamCounter.increment();
49 #endif
50     suspendIfNeeded();
51 }
52
53 ReadableStream::~ReadableStream()
54 {
55 #ifndef NDEBUG
56     readableStreamCounter.decrement();
57 #endif
58 }
59
60 void ReadableStream::clearCallbacks()
61 {
62     m_closedSuccessCallback = nullptr;
63     m_closedFailureCallback = nullptr;
64
65     m_readRequests.clear();
66 }
67
68 void ReadableStream::changeStateToClosed()
69 {
70     ASSERT(!m_closeRequested);
71     ASSERT(m_state != State::Errored);
72
73     m_closeRequested = true;
74
75     if (m_state != State::Readable || hasValue())
76         return;
77     close();
78 }
79
80 void ReadableStream::close()
81 {
82     m_state = State::Closed;
83
84     if (m_reader)
85         m_releasedReaders.append(WTF::move(m_reader));
86
87     if (m_closedSuccessCallback)
88         m_closedSuccessCallback();
89
90     for (auto& request : m_readRequests)
91         request.endCallback();
92
93     clearCallbacks();
94 }
95
96 void ReadableStream::changeStateToErrored()
97 {
98     if (m_state != State::Readable)
99         return;
100     m_state = State::Errored;
101
102     if (m_reader)
103         m_releasedReaders.append(WTF::move(m_reader));
104
105     JSC::JSValue error = this->error();
106     if (m_closedFailureCallback)
107         m_closedFailureCallback(error);
108
109     for (auto& request : m_readRequests)
110         request.failureCallback(error);
111
112     clearCallbacks();
113 }
114
115 void ReadableStream::start()
116 {
117     m_isStarted = true;
118     pull();
119 }
120
121 void ReadableStream::pull()
122 {
123     if (!m_isStarted || m_state == State::Closed || m_state == State::Errored || m_closeRequested)
124         return;
125     // FIXME: Implement queueSize check.
126     if (m_readRequests.isEmpty() && hasValue())
127         return;
128     // FIXME: Implement async pull check.
129     doPull();
130 }
131
132 ReadableStreamReader& ReadableStream::getReader()
133 {
134     ASSERT(!m_reader);
135
136     std::unique_ptr<ReadableStreamReader> newReader = std::make_unique<ReadableStreamReader>(*this);
137     ReadableStreamReader& reader = *newReader.get();
138
139     if (m_state == State::Readable) {
140         m_reader = WTF::move(newReader);
141         return reader;
142     }
143
144     m_releasedReaders.append(WTF::move(newReader));
145     return reader;
146 }
147
148 void ReadableStream::closed(ClosedSuccessCallback&& successCallback, FailureCallback&& failureCallback)
149 {
150     if (m_state == State::Closed) {
151         successCallback();
152         return;
153     }
154     if (m_state == State::Errored) {
155         failureCallback(error());
156         return;
157     }
158     m_closedSuccessCallback = WTF::move(successCallback);
159     m_closedFailureCallback = WTF::move(failureCallback);
160 }
161
162 void ReadableStream::read(ReadSuccessCallback&& successCallback, ReadEndCallback&& endCallback, FailureCallback&& failureCallback)
163 {
164     if (m_state == State::Closed) {
165         endCallback();
166         return;
167     }
168     if (m_state == State::Errored) {
169         failureCallback(error());
170         return;
171     }
172     if (hasValue()) {
173         successCallback(read());
174         if (!m_closeRequested)
175             pull();
176         else if (!hasValue())
177             close();
178         return;
179     }
180     m_readRequests.append({ WTF::move(successCallback), WTF::move(endCallback), WTF::move(failureCallback) });
181     pull();
182 }
183
184 bool ReadableStream::resolveReadCallback(JSC::JSValue value)
185 {
186     if (m_readRequests.isEmpty())
187         return false;
188
189     m_readRequests.takeFirst().successCallback(value);
190     return true;
191 }
192
193 const char* ReadableStream::activeDOMObjectName() const
194 {
195     return "ReadableStream";
196 }
197
198 bool ReadableStream::canSuspendForPageCache() const
199 {
200     // FIXME: We should try and do better here.
201     return false;
202 }
203
204 }
205
206 #endif