readableStreamDefaultControllerError should return early if stream is not readable
[WebKit-https.git] / Source / WebCore / Modules / streams / ReadableStreamInternals.js
1 /*
2  * Copyright (C) 2015 Canon Inc. All rights reserved.
3  * Copyright (C) 2015 Igalia.
4  *
5  * Redistribution and use in source and binary forms, with or without
6  * modification, are permitted provided that the following conditions
7  * are met:
8  * 1. Redistributions of source code must retain the above copyright
9  *    notice, this list of conditions and the following disclaimer.
10  * 2. Redistributions in binary form must reproduce the above copyright
11  *    notice, this list of conditions and the following disclaimer in the
12  *    documentation and/or other materials provided with the distribution.
13  *
14  * THIS SOFTWARE IS PROVIDED BY APPLE INC. ``AS IS'' AND ANY
15  * EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
16  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
17  * PURPOSE ARE DISCLAIMED.  IN NO EVENT SHALL APPLE INC. OR
18  * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
19  * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
20  * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
21  * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY
22  * OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
23  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
24  * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
25  */
26
27 // @conditional=ENABLE(STREAMS_API)
28 // @internal
29
30 function privateInitializeReadableStreamDefaultReader(stream)
31 {
32     "use strict";
33
34     if (!@isReadableStream(stream))
35        @throwTypeError("ReadableStreamDefaultReader needs a ReadableStream");
36     if (@isReadableStreamLocked(stream))
37        @throwTypeError("ReadableStream is locked");
38
39     @readableStreamReaderGenericInitialize(this, stream);
40     @putByIdDirectPrivate(this, "readRequests", []);
41
42     return this;
43 }
44
45 function readableStreamReaderGenericInitialize(reader, stream)
46 {
47     "use strict";
48
49     @putByIdDirectPrivate(reader, "ownerReadableStream", stream);
50     @putByIdDirectPrivate(stream, "reader", reader);
51     if (@getByIdDirectPrivate(stream, "state") === @streamReadable)
52         @putByIdDirectPrivate(reader, "closedPromiseCapability", @newPromiseCapability(@Promise));
53     else if (@getByIdDirectPrivate(stream, "state") === @streamClosed)
54         @putByIdDirectPrivate(reader, "closedPromiseCapability", { @promise: @Promise.@resolve() });
55     else {
56         @assert(@getByIdDirectPrivate(stream, "state") === @streamErrored);
57         @putByIdDirectPrivate(reader, "closedPromiseCapability", { @promise: @newHandledRejectedPromise(@getByIdDirectPrivate(stream, "storedError")) });
58     }
59 }
60
61 function privateInitializeReadableStreamDefaultController(stream, underlyingSource, size, highWaterMark)
62 {
63     "use strict";
64
65     if (!@isReadableStream(stream))
66         @throwTypeError("ReadableStreamDefaultController needs a ReadableStream");
67
68     // readableStreamController is initialized with null value.
69     if (@getByIdDirectPrivate(stream, "readableStreamController") !== null)
70         @throwTypeError("ReadableStream already has a controller");
71
72     @putByIdDirectPrivate(this, "controlledReadableStream", stream);
73     @putByIdDirectPrivate(this, "underlyingSource", underlyingSource);
74     @putByIdDirectPrivate(this, "queue", @newQueue());
75     @putByIdDirectPrivate(this, "started", false);
76     @putByIdDirectPrivate(this, "closeRequested", false);
77     @putByIdDirectPrivate(this, "pullAgain", false);
78     @putByIdDirectPrivate(this, "pulling", false);
79     @putByIdDirectPrivate(this, "strategy", @validateAndNormalizeQueuingStrategy(size, highWaterMark));
80
81     const controller = this;
82     @promiseInvokeOrNoopNoCatch(underlyingSource, "start", [this]).@then(() => {
83         @putByIdDirectPrivate(controller, "started", true);
84         @assert(!@getByIdDirectPrivate(controller, "pulling"));
85         @assert(!@getByIdDirectPrivate(controller, "pullAgain"));
86         @readableStreamDefaultControllerCallPullIfNeeded(controller);
87     }, (error) => {
88         @readableStreamDefaultControllerError(controller, error);
89     });
90
91     @putByIdDirectPrivate(this, "cancel", @readableStreamDefaultControllerCancel);
92
93     @putByIdDirectPrivate(this, "pull", @readableStreamDefaultControllerPull);
94
95     return this;
96 }
97
98 function readableStreamDefaultControllerError(controller, error)
99 {
100     "use strict";
101
102     const stream = @getByIdDirectPrivate(controller, "controlledReadableStream");
103     if (@getByIdDirectPrivate(stream, "state") !== @streamReadable)
104         return;
105     @putByIdDirectPrivate(controller, "queue", @newQueue());
106     @readableStreamError(stream, error);
107 }
108
109 function readableStreamPipeTo(stream, sink)
110 {
111     "use strict";
112     @assert(@isReadableStream(stream));
113
114     const reader = new @ReadableStreamDefaultReader(stream);
115
116     @getByIdDirectPrivate(reader, "closedPromiseCapability").@promise.@then(() => { }, (e) => { sink.error(e); });
117
118     function doPipe() {
119         @readableStreamDefaultReaderRead(reader).@then(function(result) {
120             if (result.done) {
121                 sink.close();
122                 return;
123             }
124             try {
125                 sink.enqueue(result.value);
126             } catch (e) {
127                 sink.error("ReadableStream chunk enqueueing in the sink failed");
128                 return;
129             }
130             doPipe();
131         });
132     }
133     doPipe();
134 }
135
136 function readableStreamTee(stream, shouldClone)
137 {
138     "use strict";
139
140     @assert(@isReadableStream(stream));
141     @assert(typeof(shouldClone) === "boolean");
142
143     const reader = new @ReadableStreamDefaultReader(stream);
144
145     const teeState = {
146         closedOrErrored: false,
147         canceled1: false,
148         canceled2: false,
149         reason1: @undefined,
150         reason2: @undefined,
151     };
152
153     teeState.cancelPromiseCapability = @newPromiseCapability(@InternalPromise);
154
155     const pullFunction = @readableStreamTeePullFunction(teeState, reader, shouldClone);
156
157     const branch1 = new @ReadableStream({
158         "pull": pullFunction,
159         "cancel": @readableStreamTeeBranch1CancelFunction(teeState, stream)
160     });
161     const branch2 = new @ReadableStream({
162         "pull": pullFunction,
163         "cancel": @readableStreamTeeBranch2CancelFunction(teeState, stream)
164     });
165
166     @getByIdDirectPrivate(reader, "closedPromiseCapability").@promise.@then(@undefined, function(e) {
167         if (teeState.closedOrErrored)
168             return;
169         @readableStreamDefaultControllerError(branch1.@readableStreamController, e);
170         @readableStreamDefaultControllerError(branch2.@readableStreamController, e);
171         teeState.closedOrErrored = true;
172     });
173
174     // Additional fields compared to the spec, as they are needed within pull/cancel functions.
175     teeState.branch1 = branch1;
176     teeState.branch2 = branch2;
177
178     return [branch1, branch2];
179 }
180
181 function doStructuredClone(object)
182 {
183     "use strict";
184
185     // FIXME: We should implement http://w3c.github.io/html/infrastructure.html#ref-for-structured-clone-4
186     // Implementation is currently limited to ArrayBuffer/ArrayBufferView to meet Fetch API needs.
187
188     if (object instanceof @ArrayBuffer)
189         return @structuredCloneArrayBuffer(object);
190
191     if (@ArrayBuffer.@isView(object))
192         return @structuredCloneArrayBufferView(object);
193
194     @throwTypeError("structuredClone not implemented for: " + object);
195 }
196
197 function readableStreamTeePullFunction(teeState, reader, shouldClone)
198 {
199     "use strict";
200
201     return function() {
202         @Promise.prototype.@then.@call(@readableStreamDefaultReaderRead(reader), function(result) {
203             @assert(@isObject(result));
204             @assert(typeof result.done === "boolean");
205             if (result.done && !teeState.closedOrErrored) {
206                 if (!teeState.canceled1)
207                     @readableStreamDefaultControllerClose(teeState.branch1.@readableStreamController);
208                 if (!teeState.canceled2)
209                     @readableStreamDefaultControllerClose(teeState.branch2.@readableStreamController);
210                 teeState.closedOrErrored = true;
211             }
212             if (teeState.closedOrErrored)
213                 return;
214             if (!teeState.canceled1)
215                 @readableStreamDefaultControllerEnqueue(teeState.branch1.@readableStreamController, result.value);
216             if (!teeState.canceled2)
217                 @readableStreamDefaultControllerEnqueue(teeState.branch2.@readableStreamController, shouldClone ? @doStructuredClone(result.value) : result.value);
218         });
219     }
220 }
221
222 function readableStreamTeeBranch1CancelFunction(teeState, stream)
223 {
224     "use strict";
225
226     return function(r) {
227         teeState.canceled1 = true;
228         teeState.reason1 = r;
229         if (teeState.canceled2) {
230             @readableStreamCancel(stream, [teeState.reason1, teeState.reason2]).@then(
231                 teeState.cancelPromiseCapability.@resolve,
232                 teeState.cancelPromiseCapability.@reject);
233         }
234         return teeState.cancelPromiseCapability.@promise;
235     }
236 }
237
238 function readableStreamTeeBranch2CancelFunction(teeState, stream)
239 {
240     "use strict";
241
242     return function(r) {
243         teeState.canceled2 = true;
244         teeState.reason2 = r;
245         if (teeState.canceled1) {
246             @readableStreamCancel(stream, [teeState.reason1, teeState.reason2]).@then(
247                 teeState.cancelPromiseCapability.@resolve,
248                 teeState.cancelPromiseCapability.@reject);
249         }
250         return teeState.cancelPromiseCapability.@promise;
251     }
252 }
253
254 function isReadableStream(stream)
255 {
256     "use strict";
257
258     // Spec tells to return true only if stream has a readableStreamController internal slot.
259     // However, since it is a private slot, it cannot be checked using hasOwnProperty().
260     // Therefore, readableStreamController is initialized with null value.
261     return @isObject(stream) && @getByIdDirectPrivate(stream, "readableStreamController") !== @undefined;
262 }
263
264 function isReadableStreamDefaultReader(reader)
265 {
266     "use strict";
267
268     // Spec tells to return true only if reader has a readRequests internal slot.
269     // However, since it is a private slot, it cannot be checked using hasOwnProperty().
270     // Since readRequests is initialized with an empty array, the following test is ok.
271     return @isObject(reader) && !!@getByIdDirectPrivate(reader, "readRequests");
272 }
273
274 function isReadableStreamDefaultController(controller)
275 {
276     "use strict";
277
278     // Spec tells to return true only if controller has an underlyingSource internal slot.
279     // However, since it is a private slot, it cannot be checked using hasOwnProperty().
280     // underlyingSource is obtained in ReadableStream constructor: if undefined, it is set
281     // to an empty object. Therefore, following test is ok.
282     return @isObject(controller) && !!@getByIdDirectPrivate(controller, "underlyingSource");
283 }
284
285 function readableStreamError(stream, error)
286 {
287     "use strict";
288
289     @assert(@isReadableStream(stream));
290     @assert(@getByIdDirectPrivate(stream, "state") === @streamReadable);
291     @putByIdDirectPrivate(stream, "state", @streamErrored);
292     @putByIdDirectPrivate(stream, "storedError", error);
293
294     if (!@getByIdDirectPrivate(stream, "reader"))
295         return;
296
297     const reader = @getByIdDirectPrivate(stream, "reader");
298
299     if (@isReadableStreamDefaultReader(reader)) {
300         const requests = @getByIdDirectPrivate(reader, "readRequests");
301         for (let index = 0, length = requests.length; index < length; ++index)
302             requests[index].@reject.@call(@undefined, error);
303         @putByIdDirectPrivate(reader, "readRequests", []);
304     } else {
305         @assert(@isReadableStreamBYOBReader(reader));
306         const requests = @getByIdDirectPrivate(reader, "readIntoRequests");
307         for (let index = 0, length = requests.length; index < length; ++index)
308             requests[index].@reject.@call(@undefined, error);
309         @putByIdDirectPrivate(reader, "readIntoRequests", []);
310     }
311
312     @getByIdDirectPrivate(reader, "closedPromiseCapability").@reject.@call(@undefined, error);
313     @putByIdDirectPrivate(@getByIdDirectPrivate(reader, "closedPromiseCapability").@promise, "promiseIsHandled", true);
314 }
315
316 function readableStreamDefaultControllerCallPullIfNeeded(controller)
317 {
318     "use strict";
319
320     const stream = @getByIdDirectPrivate(controller, "controlledReadableStream");
321
322     if (!@readableStreamDefaultControllerCanCloseOrEnqueue(controller))
323         return;
324     if (!@getByIdDirectPrivate(controller, "started"))
325         return;
326     if ((!@isReadableStreamLocked(stream) || !@getByIdDirectPrivate(@getByIdDirectPrivate(stream, "reader"), "readRequests").length) && @readableStreamDefaultControllerGetDesiredSize(controller) <= 0)
327         return;
328
329     if (@getByIdDirectPrivate(controller, "pulling")) {
330         @putByIdDirectPrivate(controller, "pullAgain", true);
331         return;
332     }
333
334     @assert(!@getByIdDirectPrivate(controller, "pullAgain"));
335     @putByIdDirectPrivate(controller, "pulling", true);
336
337     @promiseInvokeOrNoop(@getByIdDirectPrivate(controller, "underlyingSource"), "pull", [controller]).@then(function() {
338         @putByIdDirectPrivate(controller, "pulling", false);
339         if (@getByIdDirectPrivate(controller, "pullAgain")) {
340             @putByIdDirectPrivate(controller, "pullAgain", false);
341             @readableStreamDefaultControllerCallPullIfNeeded(controller);
342         }
343     }, function(error) {
344         @readableStreamDefaultControllerError(controller, error);
345     });
346 }
347
348 function isReadableStreamLocked(stream)
349 {
350    "use strict";
351
352     @assert(@isReadableStream(stream));
353     return !!@getByIdDirectPrivate(stream, "reader");
354 }
355
356 function readableStreamDefaultControllerGetDesiredSize(controller)
357 {
358    "use strict";
359
360     const stream = @getByIdDirectPrivate(controller, "controlledReadableStream");
361     const state = @getByIdDirectPrivate(stream, "state");
362
363     if (state === @streamErrored)
364         return null;
365     if (state === @streamClosed)
366         return 0;
367
368     return @getByIdDirectPrivate(controller, "strategy").highWaterMark - @getByIdDirectPrivate(controller, "queue").size;
369 }
370
371
372 function readableStreamReaderGenericCancel(reader, reason)
373 {
374     "use strict";
375
376     const stream = @getByIdDirectPrivate(reader, "ownerReadableStream");
377     @assert(!!stream);
378     return @readableStreamCancel(stream, reason);
379 }
380
381 function readableStreamCancel(stream, reason)
382 {
383     "use strict";
384
385     @putByIdDirectPrivate(stream, "disturbed", true);
386     const state = @getByIdDirectPrivate(stream, "state");
387     if (state === @streamClosed)
388         return @Promise.@resolve();
389     if (state === @streamErrored)
390         return @Promise.@reject(@getByIdDirectPrivate(stream, "storedError"));
391     @readableStreamClose(stream);
392     return @getByIdDirectPrivate(stream, "readableStreamController").@cancel(@getByIdDirectPrivate(stream, "readableStreamController"), reason).@then(function() {  });
393 }
394
395 function readableStreamDefaultControllerCancel(controller, reason)
396 {
397     "use strict";
398
399     @putByIdDirectPrivate(controller, "queue", @newQueue());
400     return @promiseInvokeOrNoop(@getByIdDirectPrivate(controller, "underlyingSource"), "cancel", [reason]);
401 }
402
403 function readableStreamDefaultControllerPull(controller)
404 {
405     "use strict";
406
407     const stream = @getByIdDirectPrivate(controller, "controlledReadableStream");
408     if (@getByIdDirectPrivate(controller, "queue").content.length) {
409         const chunk = @dequeueValue(@getByIdDirectPrivate(controller, "queue"));
410         if (@getByIdDirectPrivate(controller, "closeRequested") && @getByIdDirectPrivate(controller, "queue").content.length === 0)
411             @readableStreamClose(stream);
412         else
413             @readableStreamDefaultControllerCallPullIfNeeded(controller);
414         return @Promise.@resolve({value: chunk, done: false});
415     }
416     const pendingPromise = @readableStreamAddReadRequest(stream);
417     @readableStreamDefaultControllerCallPullIfNeeded(controller);
418     return pendingPromise;
419 }
420
421 function readableStreamDefaultControllerClose(controller)
422 {
423     "use strict";
424
425     @assert(@readableStreamDefaultControllerCanCloseOrEnqueue(controller));
426     @putByIdDirectPrivate(controller, "closeRequested", true);
427     if (@getByIdDirectPrivate(controller, "queue").content.length === 0)
428         @readableStreamClose(@getByIdDirectPrivate(controller, "controlledReadableStream"));
429 }
430
431 function readableStreamClose(stream)
432 {
433     "use strict";
434
435     @assert(@getByIdDirectPrivate(stream, "state") === @streamReadable);
436     @putByIdDirectPrivate(stream, "state", @streamClosed);
437     const reader = @getByIdDirectPrivate(stream, "reader");
438
439     if (!reader)
440         return;
441
442     if (@isReadableStreamDefaultReader(reader)) {
443         const requests = @getByIdDirectPrivate(reader, "readRequests");
444         for (let index = 0, length = requests.length; index < length; ++index)
445             requests[index].@resolve.@call(@undefined, {value:@undefined, done: true});
446         @putByIdDirectPrivate(reader, "readRequests", []);
447     }
448
449     @getByIdDirectPrivate(reader, "closedPromiseCapability").@resolve.@call();
450 }
451
452 function readableStreamFulfillReadRequest(stream, chunk, done)
453 {
454     "use strict";
455
456     @getByIdDirectPrivate(@getByIdDirectPrivate(stream, "reader"), "readRequests").@shift().@resolve.@call(@undefined, {value: chunk, done: done});
457 }
458
459 function readableStreamDefaultControllerEnqueue(controller, chunk)
460 {
461     "use strict";
462
463     const stream = @getByIdDirectPrivate(controller, "controlledReadableStream");
464     @assert(@readableStreamDefaultControllerCanCloseOrEnqueue(controller));
465
466     if (@isReadableStreamLocked(stream) && @getByIdDirectPrivate(@getByIdDirectPrivate(stream, "reader"), "readRequests").length) {
467         @readableStreamFulfillReadRequest(stream, chunk, false);
468         @readableStreamDefaultControllerCallPullIfNeeded(controller);
469         return;
470     }
471
472     try {
473         let chunkSize = 1;
474         if (@getByIdDirectPrivate(controller, "strategy").size !== @undefined)
475             chunkSize = @getByIdDirectPrivate(controller, "strategy").size(chunk);
476         @enqueueValueWithSize(@getByIdDirectPrivate(controller, "queue"), chunk, chunkSize);
477     }
478     catch(error) {
479         @readableStreamDefaultControllerError(controller, error);
480         throw error;
481     }
482     @readableStreamDefaultControllerCallPullIfNeeded(controller);
483 }
484
485 function readableStreamDefaultReaderRead(reader)
486 {
487     "use strict";
488
489     const stream = @getByIdDirectPrivate(reader, "ownerReadableStream");
490     @assert(!!stream);
491     const state = @getByIdDirectPrivate(stream, "state");
492
493     @putByIdDirectPrivate(stream, "disturbed", true);
494     if (state === @streamClosed)
495         return @Promise.@resolve({value: @undefined, done: true});
496     if (state === @streamErrored)
497         return @Promise.@reject(@getByIdDirectPrivate(stream, "storedError"));
498     @assert(state === @streamReadable);
499
500     return @getByIdDirectPrivate(stream, "readableStreamController").@pull(@getByIdDirectPrivate(stream, "readableStreamController"));
501 }
502
503 function readableStreamAddReadRequest(stream)
504 {
505     "use strict";
506
507     @assert(@isReadableStreamDefaultReader(@getByIdDirectPrivate(stream, "reader")));
508     @assert(@getByIdDirectPrivate(stream, "state") == @streamReadable);
509
510     const readRequest = @newPromiseCapability(@Promise);
511     @getByIdDirectPrivate(@getByIdDirectPrivate(stream, "reader"), "readRequests").@push(readRequest);
512
513     return readRequest.@promise;
514 }
515
516 function isReadableStreamDisturbed(stream)
517 {
518     "use strict";
519
520     @assert(@isReadableStream(stream));
521     return @getByIdDirectPrivate(stream, "disturbed");
522 }
523
524 function readableStreamReaderGenericRelease(reader)
525 {
526     "use strict";
527
528     @assert(!!@getByIdDirectPrivate(reader, "ownerReadableStream"));
529     @assert(@getByIdDirectPrivate(@getByIdDirectPrivate(reader, "ownerReadableStream"), "reader") === reader);
530
531     if (@getByIdDirectPrivate(@getByIdDirectPrivate(reader, "ownerReadableStream"), "state") === @streamReadable)
532         @getByIdDirectPrivate(reader, "closedPromiseCapability").@reject.@call(@undefined, new @TypeError("releasing lock of reader whose stream is still in readable state"));
533     else
534         @putByIdDirectPrivate(reader, "closedPromiseCapability", { @promise: @newHandledRejectedPromise(new @TypeError("reader released lock")) });
535
536     @putByIdDirectPrivate(@getByIdDirectPrivate(reader, "closedPromiseCapability").@promise, "promiseIsHandled", true);
537     @putByIdDirectPrivate(@getByIdDirectPrivate(reader, "ownerReadableStream"), "reader", @undefined);
538     @putByIdDirectPrivate(reader, "ownerReadableStream", @undefined);
539 }
540
541 function readableStreamDefaultControllerCanCloseOrEnqueue(controller)
542 {
543     "use strict";
544
545     return !@getByIdDirectPrivate(controller, "closeRequested") && @getByIdDirectPrivate(@getByIdDirectPrivate(controller, "controlledReadableStream"), "state") === @streamReadable;
546 }