[Streams API] streams should not directly use Number and related methods
[WebKit-https.git] / Source / WebCore / Modules / streams / ReadableStreamInternals.js
1 /*
2  * Copyright (C) 2015 Canon Inc. All rights reserved.
3  * Copyright (C) 2015 Igalia.
4  *
5  * Redistribution and use in source and binary forms, with or without
6  * modification, are permitted provided that the following conditions
7  * are met:
8  * 1. Redistributions of source code must retain the above copyright
9  *    notice, this list of conditions and the following disclaimer.
10  * 2. Redistributions in binary form must reproduce the above copyright
11  *    notice, this list of conditions and the following disclaimer in the
12  *    documentation and/or other materials provided with the distribution.
13  *
14  * THIS SOFTWARE IS PROVIDED BY APPLE INC. ``AS IS'' AND ANY
15  * EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
16  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
17  * PURPOSE ARE DISCLAIMED.  IN NO EVENT SHALL APPLE INC. OR
18  * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
19  * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
20  * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
21  * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY
22  * OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
23  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
24  * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
25  */
26
27 // @conditional=ENABLE(STREAMS_API)
28 // @internal
29
30 function privateInitializeReadableStreamReader(stream)
31 {
32     "use strict";
33
34     if (!@isReadableStream(stream))
35        throw new @TypeError("ReadableStreamReader needs a ReadableStream");
36     if (@isReadableStreamLocked(stream))
37        throw new @TypeError("ReadableStream is locked");
38
39     this.@readRequests = [];
40     this.@ownerReadableStream = stream;
41     stream.@reader = this;
42     if (stream.@state === @streamReadable) {
43         this.@closedPromiseCapability = @newPromiseCapability(@Promise);
44         return this;
45     }
46     if (stream.@state === @streamClosed) {
47         this.@closedPromiseCapability = { @promise: @Promise.@resolve() };
48         return this;
49     }
50     @assert(stream.@state === @streamErrored);
51     this.@closedPromiseCapability = { @promise: @Promise.@reject(stream.@storedError) };
52
53     return this;
54 }
55
56 function privateInitializeReadableStreamController(stream)
57 {
58     "use strict";
59
60     if (!@isReadableStream(stream))
61         throw new @TypeError("ReadableStreamController needs a ReadableStream");
62     if (typeof stream.@controller !== "undefined")
63         throw new @TypeError("ReadableStream already has a controller");
64     this.@controlledReadableStream = stream;
65
66     return this;
67 }
68
69 function teeReadableStream(stream, shouldClone)
70 {
71     "use strict";
72
73     @assert(@isReadableStream(stream));
74     @assert(typeof(shouldClone) === "boolean");
75
76     const reader = stream.getReader();
77
78     const teeState = {
79         closedOrErrored: false,
80         canceled1: false,
81         canceled2: false,
82         reason1: undefined,
83         reason: undefined,
84     };
85
86     teeState.cancelPromiseCapability = @newPromiseCapability(@InternalPromise);
87
88     const pullFunction = @teeReadableStreamPullFunction(teeState, reader, shouldClone);
89
90     const branch1 = new ReadableStream({
91         "pull": pullFunction,
92         "cancel": @teeReadableStreamBranch1CancelFunction(teeState, stream)
93     });
94     const branch2 = new ReadableStream({
95         "pull": pullFunction,
96         "cancel": @teeReadableStreamBranch2CancelFunction(teeState, stream)
97     });
98
99     @Promise.prototype.@then.@call(reader.closed, undefined, function(e) {
100         if (teeState.closedOrErrored)
101             return;
102         @errorReadableStream(branch1, e);
103         @errorReadableStream(branch2, e);
104         teeState.closedOrErrored = true;
105     });
106
107     // Additional fields compared to the spec, as they are needed within pull/cancel functions.
108     teeState.branch1 = branch1;
109     teeState.branch2 = branch2;
110
111     return [branch1, branch2];
112 }
113
114 function teeReadableStreamPullFunction(teeState, reader, shouldClone)
115 {
116     "use strict";
117
118     return function() {
119         @Promise.prototype.@then.@call(reader.read(), function(result) {
120             @assert(@isObject(result));
121             @assert(typeof result.done === "boolean");
122             if (result.done && !teeState.closedOrErrored) {
123                 @closeReadableStream(teeState.branch1);
124                 @closeReadableStream(teeState.branch2);
125                 teeState.closedOrErrored = true;
126             }
127             if (teeState.closedOrErrored)
128                 return;
129             if (!teeState.canceled1) {
130                 // FIXME: Implement cloning if shouldClone is true
131                 @enqueueInReadableStream(teeState.branch1, result.value);
132             }
133             if (!teeState.canceled2) {
134                 // FIXME: Implement cloning if shouldClone is true
135                 @enqueueInReadableStream(teeState.branch2, result.value);
136             }
137         });
138     }
139 }
140
141 function teeReadableStreamBranch1CancelFunction(teeState, stream)
142 {
143     "use strict";
144
145     return function(r) {
146         teeState.canceled1 = true;
147         teeState.reason1 = r;
148         if (teeState.canceled2) {
149             @Promise.prototype.@then.@call(@cancelReadableStream(stream, [teeState.reason1, teeState.reason2]),
150                                            teeState.cancelPromiseCapability.@resolve,
151                                            teeState.cancelPromiseCapability.@reject);
152         }
153         return teeState.cancelPromiseCapability.@promise;
154     }
155 }
156
157 function teeReadableStreamBranch2CancelFunction(teeState, stream)
158 {
159     "use strict";
160
161     return function(r) {
162         teeState.canceled2 = true;
163         teeState.reason2 = r;
164         if (teeState.canceled1) {
165             @Promise.prototype.@then.@call(@cancelReadableStream(stream, [teeState.reason1, teeState.reason2]),
166                                            teeState.cancelPromiseCapability.@resolve,
167                                            teeState.cancelPromiseCapability.@reject);
168         }
169         return teeState.cancelPromiseCapability.@promise;
170     }
171 }
172
173 function isReadableStream(stream)
174 {
175     "use strict";
176
177     return @isObject(stream) && !!stream.@underlyingSource;
178 }
179
180 function isReadableStreamReader(reader)
181 {
182     "use strict";
183
184     // To reset @ownerReadableStream it must be set to null instead of undefined because there is no way to distinguish
185     // between a non-existent slot and an slot set to undefined.
186     return @isObject(reader) && typeof reader.@ownerReadableStream !== "undefined";
187 }
188
189 function isReadableStreamController(controller)
190 {
191     "use strict";
192
193     return @isObject(controller) && !!controller.@controlledReadableStream;
194 }
195
196 function errorReadableStream(stream, error)
197 {
198     "use strict";
199
200     @assert(stream.@state === @streamReadable);
201     stream.@queue = @newQueue();
202     stream.@storedError = error;
203     stream.@state = @streamErrored;
204
205     if (!stream.@reader)
206         return;
207     const reader = stream.@reader;
208
209     const requests = reader.@readRequests;
210     for (let index = 0, length = requests.length; index < length; ++index)
211         requests[index].@reject.@call(undefined, error);
212     reader.@readRequests = [];
213
214     reader.@closedPromiseCapability.@reject.@call(undefined, error);
215 }
216
217 function requestReadableStreamPull(stream)
218 {
219     "use strict";
220
221     if (stream.@state === @streamClosed || stream.@state === @streamErrored)
222         return;
223     if (stream.@closeRequested)
224         return;
225     if (!stream.@started)
226         return;
227     if ((!@isReadableStreamLocked(stream) || !stream.@reader.@readRequests.length) && @getReadableStreamDesiredSize(stream) <= 0)
228         return;
229  
230     if (stream.@pulling) {
231         stream.@pullAgain = true;
232         return;
233     }
234
235     stream.@pulling = true;
236
237     const promise = @promiseInvokeOrNoop(stream.@underlyingSource, "pull", [stream.@controller]);
238     @Promise.prototype.@then.@call(promise, function() {
239         stream.@pulling = false;
240         if (stream.@pullAgain) {
241             stream.@pullAgain = false;
242             @requestReadableStreamPull(stream);
243         }
244     }, function(error) {
245         if (stream.@state === @streamReadable)
246             @errorReadableStream(stream, error);
247     });
248 }
249
250 function isReadableStreamLocked(stream)
251 {
252    "use strict";
253
254     @assert(@isReadableStream(stream));
255     return !!stream.@reader;
256 }
257
258 function getReadableStreamDesiredSize(stream)
259 {
260    "use strict";
261
262    return stream.@strategy.highWaterMark - stream.@queue.size;
263 }
264
265 function cancelReadableStream(stream, reason)
266 {
267     "use strict";
268
269     stream.@disturbed = true;
270     if (stream.@state === @streamClosed)
271         return @Promise.@resolve();
272     if (stream.@state === @streamErrored)
273         return @Promise.@reject(stream.@storedError);
274     stream.@queue = @newQueue();
275     @finishClosingReadableStream(stream);
276     return @Promise.prototype.@then.@call(@promiseInvokeOrNoop(stream.@underlyingSource, "cancel", [reason]), function() { });
277 }
278
279 function finishClosingReadableStream(stream)
280 {
281     "use strict";
282
283     @assert(stream.@state ===  @streamReadable);
284     stream.@state = @streamClosed;
285     const reader = stream.@reader;
286     if (!reader)
287         return;
288
289     const requests = reader.@readRequests;
290     for (let index = 0, length = requests.length; index < length; ++index)
291         requests[index].@resolve.@call(undefined, {value:undefined, done: true});
292     reader.@readRequests = [];
293     reader.@closedPromiseCapability.@resolve.@call();
294 }
295
296 function closeReadableStream(stream)
297 {
298     "use strict";
299
300     @assert(!stream.@closeRequested);
301     @assert(stream.@state !== @streamErrored);
302     if (stream.@state === @streamClosed)
303         return; 
304     stream.@closeRequested = true;
305     if (!stream.@queue.content.length)
306         @finishClosingReadableStream(stream);
307 }
308
309 function enqueueInReadableStream(stream, chunk)
310 {
311     "use strict";
312
313     @assert(!stream.@closeRequested);
314     @assert(stream.@state !== @streamErrored);
315     if (stream.@state === @streamClosed)
316         return;
317     if (@isReadableStreamLocked(stream) && stream.@reader.@readRequests.length) {
318         stream.@reader.@readRequests.shift().@resolve.@call(undefined, {value: chunk, done: false});
319         @requestReadableStreamPull(stream);
320         return;
321     }
322     try {
323         let size = 1;
324         if (stream.@strategy.size) {
325             size = @Number(stream.@strategy.size(chunk));
326             if (!@isFinite(size) || size < 0)
327                 throw new @RangeError("Chunk size is not valid");
328         }
329         @enqueueValueWithSize(stream.@queue, chunk, size);
330     }
331     catch(error) {
332         @errorReadableStream(stream, error);
333         throw error;
334     }
335     @requestReadableStreamPull(stream);
336 }
337
338 function readFromReadableStreamReader(reader)
339 {
340     "use strict";
341
342     const stream = reader.@ownerReadableStream;
343     @assert(!!stream);
344     stream.@disturbed = true;
345     if (stream.@state === @streamClosed)
346         return @Promise.@resolve({value: undefined, done: true});
347     if (stream.@state === @streamErrored)
348         return @Promise.@reject(stream.@storedError);
349     @assert(stream.@state === @streamReadable);
350     if (stream.@queue.content.length) {
351         const chunk = @dequeueValue(stream.@queue);
352         if (stream.@closeRequested && stream.@queue.content.length === 0)
353             @finishClosingReadableStream(stream);
354         else
355             @requestReadableStreamPull(stream);
356         return @Promise.@resolve({value: chunk, done: false});
357     }
358     const readPromiseCapability = @newPromiseCapability(@Promise);
359     reader.@readRequests.push(readPromiseCapability);
360     @requestReadableStreamPull(stream);
361     return readPromiseCapability.@promise;
362 }
363
364 function isReadableStreamDisturbed(stream)
365 {
366     "use strict";
367
368     @assert(@isReadableStream(stream));
369     return stream.@disturbed;
370 }