3429a6104c971290ced6a213e080b23c12628fda
[WebKit-https.git] / Source / WebCore / Modules / streams / ReadableStreamInternals.js
1 /*
2  * Copyright (C) 2015 Canon Inc. All rights reserved.
3  * Copyright (C) 2015 Igalia.
4  *
5  * Redistribution and use in source and binary forms, with or without
6  * modification, are permitted provided that the following conditions
7  * are met:
8  * 1. Redistributions of source code must retain the above copyright
9  *    notice, this list of conditions and the following disclaimer.
10  * 2. Redistributions in binary form must reproduce the above copyright
11  *    notice, this list of conditions and the following disclaimer in the
12  *    documentation and/or other materials provided with the distribution.
13  *
14  * THIS SOFTWARE IS PROVIDED BY APPLE INC. ``AS IS'' AND ANY
15  * EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
16  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
17  * PURPOSE ARE DISCLAIMED.  IN NO EVENT SHALL APPLE INC. OR
18  * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
19  * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
20  * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
21  * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY
22  * OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
23  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
24  * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
25  */
26
27 // @conditional=ENABLE(STREAMS_API)
28 // @internal
29
30 function privateInitializeReadableStreamDefaultReader(stream)
31 {
32     "use strict";
33
34     if (!@isReadableStream(stream))
35        @throwTypeError("ReadableStreamDefaultReader needs a ReadableStream");
36     if (@isReadableStreamLocked(stream))
37        @throwTypeError("ReadableStream is locked");
38
39     @readableStreamReaderGenericInitialize(this, stream);
40     @putByIdDirectPrivate(this, "readRequests", []);
41
42     return this;
43 }
44
45 function readableStreamReaderGenericInitialize(reader, stream)
46 {
47     "use strict";
48
49     @putByIdDirectPrivate(reader, "ownerReadableStream", stream);
50     @putByIdDirectPrivate(stream, "reader", reader);
51     if (@getByIdDirectPrivate(stream, "state") === @streamReadable)
52         @putByIdDirectPrivate(reader, "closedPromiseCapability", @newPromiseCapability(@Promise));
53     else if (@getByIdDirectPrivate(stream, "state") === @streamClosed)
54         @putByIdDirectPrivate(reader, "closedPromiseCapability", { @promise: @Promise.@resolve() });
55     else {
56         @assert(@getByIdDirectPrivate(stream, "state") === @streamErrored);
57         @putByIdDirectPrivate(reader, "closedPromiseCapability", { @promise: @newHandledRejectedPromise(@getByIdDirectPrivate(stream, "storedError")) });
58     }
59 }
60
61 function privateInitializeReadableStreamDefaultController(stream, underlyingSource, size, highWaterMark)
62 {
63     "use strict";
64
65     if (!@isReadableStream(stream))
66         @throwTypeError("ReadableStreamDefaultController needs a ReadableStream");
67
68     // readableStreamController is initialized with null value.
69     if (@getByIdDirectPrivate(stream, "readableStreamController") !== null)
70         @throwTypeError("ReadableStream already has a controller");
71
72     @putByIdDirectPrivate(this, "controlledReadableStream", stream);
73     @putByIdDirectPrivate(this, "underlyingSource", underlyingSource);
74     @putByIdDirectPrivate(this, "queue", @newQueue());
75     @putByIdDirectPrivate(this, "started", false);
76     @putByIdDirectPrivate(this, "closeRequested", false);
77     @putByIdDirectPrivate(this, "pullAgain", false);
78     @putByIdDirectPrivate(this, "pulling", false);
79     @putByIdDirectPrivate(this, "strategy", @validateAndNormalizeQueuingStrategy(size, highWaterMark));
80
81     const controller = this;
82     @promiseInvokeOrNoopNoCatch(underlyingSource, "start", [this]).@then(() => {
83         @putByIdDirectPrivate(controller, "started", true);
84         @assert(!@getByIdDirectPrivate(controller, "pulling"));
85         @assert(!@getByIdDirectPrivate(controller, "pullAgain"));
86         @readableStreamDefaultControllerCallPullIfNeeded(controller);
87     }, (error) => {
88         if (@getByIdDirectPrivate(stream, "state") === @streamReadable)
89             @readableStreamDefaultControllerError(controller, error);
90     });
91
92     @putByIdDirectPrivate(this, "cancel", @readableStreamDefaultControllerCancel);
93
94     @putByIdDirectPrivate(this, "pull", @readableStreamDefaultControllerPull);
95
96     return this;
97 }
98
99 function readableStreamDefaultControllerError(controller, error)
100 {
101     "use strict";
102
103     const stream = @getByIdDirectPrivate(controller, "controlledReadableStream");
104     @assert(@getByIdDirectPrivate(stream, "state") === @streamReadable);
105     @putByIdDirectPrivate(controller, "queue", @newQueue());
106     @readableStreamError(stream, error);
107 }
108
109 function readableStreamPipeTo(stream, sink)
110 {
111     "use strict";
112     @assert(@isReadableStream(stream));
113
114     const reader = new @ReadableStreamDefaultReader(stream);
115
116     @getByIdDirectPrivate(reader, "closedPromiseCapability").@promise.@then(() => { }, (e) => { sink.error(e); });
117
118     function doPipe() {
119         @readableStreamDefaultReaderRead(reader).@then(function(result) {
120             if (result.done) {
121                 sink.close();
122                 return;
123             }
124             try {
125                 sink.enqueue(result.value);
126             } catch (e) {
127                 sink.error("ReadableStream chunk enqueueing in the sink failed");
128                 return;
129             }
130             doPipe();
131         });
132     }
133     doPipe();
134 }
135
136 function readableStreamTee(stream, shouldClone)
137 {
138     "use strict";
139
140     @assert(@isReadableStream(stream));
141     @assert(typeof(shouldClone) === "boolean");
142
143     const reader = new @ReadableStreamDefaultReader(stream);
144
145     const teeState = {
146         closedOrErrored: false,
147         canceled1: false,
148         canceled2: false,
149         reason1: @undefined,
150         reason2: @undefined,
151     };
152
153     teeState.cancelPromiseCapability = @newPromiseCapability(@InternalPromise);
154
155     const pullFunction = @readableStreamTeePullFunction(teeState, reader, shouldClone);
156
157     const branch1 = new @ReadableStream({
158         "pull": pullFunction,
159         "cancel": @readableStreamTeeBranch1CancelFunction(teeState, stream)
160     });
161     const branch2 = new @ReadableStream({
162         "pull": pullFunction,
163         "cancel": @readableStreamTeeBranch2CancelFunction(teeState, stream)
164     });
165
166     @getByIdDirectPrivate(reader, "closedPromiseCapability").@promise.@then(@undefined, function(e) {
167         if (teeState.closedOrErrored)
168             return;
169         @readableStreamDefaultControllerError(branch1.@readableStreamController, e);
170         @readableStreamDefaultControllerError(branch2.@readableStreamController, e);
171         teeState.closedOrErrored = true;
172     });
173
174     // Additional fields compared to the spec, as they are needed within pull/cancel functions.
175     teeState.branch1 = branch1;
176     teeState.branch2 = branch2;
177
178     return [branch1, branch2];
179 }
180
181 function doStructuredClone(object)
182 {
183     "use strict";
184
185     // FIXME: We should implement http://w3c.github.io/html/infrastructure.html#ref-for-structured-clone-4
186     // Implementation is currently limited to ArrayBuffer/ArrayBufferView to meet Fetch API needs.
187
188     if (object instanceof @ArrayBuffer)
189         return @structuredCloneArrayBuffer(object);
190
191     if (@ArrayBuffer.@isView(object))
192         return @structuredCloneArrayBufferView(object);
193
194     @throwTypeError("structuredClone not implemented for: " + object);
195 }
196
197 function readableStreamTeePullFunction(teeState, reader, shouldClone)
198 {
199     "use strict";
200
201     return function() {
202         @Promise.prototype.@then.@call(@readableStreamDefaultReaderRead(reader), function(result) {
203             @assert(@isObject(result));
204             @assert(typeof result.done === "boolean");
205             if (result.done && !teeState.closedOrErrored) {
206                 if (!teeState.canceled1)
207                     @readableStreamDefaultControllerClose(teeState.branch1.@readableStreamController);
208                 if (!teeState.canceled2)
209                     @readableStreamDefaultControllerClose(teeState.branch2.@readableStreamController);
210                 teeState.closedOrErrored = true;
211             }
212             if (teeState.closedOrErrored)
213                 return;
214             if (!teeState.canceled1)
215                 @readableStreamDefaultControllerEnqueue(teeState.branch1.@readableStreamController, result.value);
216             if (!teeState.canceled2)
217                 @readableStreamDefaultControllerEnqueue(teeState.branch2.@readableStreamController, shouldClone ? @doStructuredClone(result.value) : result.value);
218         });
219     }
220 }
221
222 function readableStreamTeeBranch1CancelFunction(teeState, stream)
223 {
224     "use strict";
225
226     return function(r) {
227         teeState.canceled1 = true;
228         teeState.reason1 = r;
229         if (teeState.canceled2) {
230             @readableStreamCancel(stream, [teeState.reason1, teeState.reason2]).@then(
231                 teeState.cancelPromiseCapability.@resolve,
232                 teeState.cancelPromiseCapability.@reject);
233         }
234         return teeState.cancelPromiseCapability.@promise;
235     }
236 }
237
238 function readableStreamTeeBranch2CancelFunction(teeState, stream)
239 {
240     "use strict";
241
242     return function(r) {
243         teeState.canceled2 = true;
244         teeState.reason2 = r;
245         if (teeState.canceled1) {
246             @readableStreamCancel(stream, [teeState.reason1, teeState.reason2]).@then(
247                 teeState.cancelPromiseCapability.@resolve,
248                 teeState.cancelPromiseCapability.@reject);
249         }
250         return teeState.cancelPromiseCapability.@promise;
251     }
252 }
253
254 function isReadableStream(stream)
255 {
256     "use strict";
257
258     // Spec tells to return true only if stream has a readableStreamController internal slot.
259     // However, since it is a private slot, it cannot be checked using hasOwnProperty().
260     // Therefore, readableStreamController is initialized with null value.
261     return @isObject(stream) && @getByIdDirectPrivate(stream, "readableStreamController") !== @undefined;
262 }
263
264 function isReadableStreamDefaultReader(reader)
265 {
266     "use strict";
267
268     // Spec tells to return true only if reader has a readRequests internal slot.
269     // However, since it is a private slot, it cannot be checked using hasOwnProperty().
270     // Since readRequests is initialized with an empty array, the following test is ok.
271     return @isObject(reader) && !!@getByIdDirectPrivate(reader, "readRequests");
272 }
273
274 function isReadableStreamDefaultController(controller)
275 {
276     "use strict";
277
278     // Spec tells to return true only if controller has an underlyingSource internal slot.
279     // However, since it is a private slot, it cannot be checked using hasOwnProperty().
280     // underlyingSource is obtained in ReadableStream constructor: if undefined, it is set
281     // to an empty object. Therefore, following test is ok.
282     return @isObject(controller) && !!@getByIdDirectPrivate(controller, "underlyingSource");
283 }
284
285 function readableStreamError(stream, error)
286 {
287     "use strict";
288
289     @assert(@isReadableStream(stream));
290     @assert(@getByIdDirectPrivate(stream, "state") === @streamReadable);
291     @putByIdDirectPrivate(stream, "state", @streamErrored);
292     @putByIdDirectPrivate(stream, "storedError", error);
293
294     if (!@getByIdDirectPrivate(stream, "reader"))
295         return;
296
297     const reader = @getByIdDirectPrivate(stream, "reader");
298
299     if (@isReadableStreamDefaultReader(reader)) {
300         const requests = @getByIdDirectPrivate(reader, "readRequests");
301         for (let index = 0, length = requests.length; index < length; ++index)
302             requests[index].@reject.@call(@undefined, error);
303         @putByIdDirectPrivate(reader, "readRequests", []);
304     } else {
305         @assert(@isReadableStreamBYOBReader(reader));
306         const requests = @getByIdDirectPrivate(reader, "readIntoRequests");
307         for (let index = 0, length = requests.length; index < length; ++index)
308             requests[index].@reject.@call(@undefined, error);
309         @putByIdDirectPrivate(reader, "readIntoRequests", []);
310     }
311
312     @getByIdDirectPrivate(reader, "closedPromiseCapability").@reject.@call(@undefined, error);
313     @putByIdDirectPrivate(@getByIdDirectPrivate(reader, "closedPromiseCapability").@promise, "promiseIsHandled", true);
314 }
315
316 function readableStreamDefaultControllerCallPullIfNeeded(controller)
317 {
318     "use strict";
319
320     const stream = @getByIdDirectPrivate(controller, "controlledReadableStream");
321
322     if (!@readableStreamDefaultControllerCanCloseOrEnqueue(controller))
323         return;
324     if (!@getByIdDirectPrivate(controller, "started"))
325         return;
326     if ((!@isReadableStreamLocked(stream) || !@getByIdDirectPrivate(@getByIdDirectPrivate(stream, "reader"), "readRequests").length) && @readableStreamDefaultControllerGetDesiredSize(controller) <= 0)
327         return;
328
329     if (@getByIdDirectPrivate(controller, "pulling")) {
330         @putByIdDirectPrivate(controller, "pullAgain", true);
331         return;
332     }
333
334     @assert(!@getByIdDirectPrivate(controller, "pullAgain"));
335     @putByIdDirectPrivate(controller, "pulling", true);
336
337     @promiseInvokeOrNoop(@getByIdDirectPrivate(controller, "underlyingSource"), "pull", [controller]).@then(function() {
338         @putByIdDirectPrivate(controller, "pulling", false);
339         if (@getByIdDirectPrivate(controller, "pullAgain")) {
340             @putByIdDirectPrivate(controller, "pullAgain", false);
341             @readableStreamDefaultControllerCallPullIfNeeded(controller);
342         }
343     }, function(error) {
344         if (@getByIdDirectPrivate(stream, "state") === @streamReadable)
345             @readableStreamDefaultControllerError(controller, error);
346     });
347 }
348
349 function isReadableStreamLocked(stream)
350 {
351    "use strict";
352
353     @assert(@isReadableStream(stream));
354     return !!@getByIdDirectPrivate(stream, "reader");
355 }
356
357 function readableStreamDefaultControllerGetDesiredSize(controller)
358 {
359    "use strict";
360
361     const stream = @getByIdDirectPrivate(controller, "controlledReadableStream");
362     const state = @getByIdDirectPrivate(stream, "state");
363
364     if (state === @streamErrored)
365         return null;
366     if (state === @streamClosed)
367         return 0;
368
369     return @getByIdDirectPrivate(controller, "strategy").highWaterMark - @getByIdDirectPrivate(controller, "queue").size;
370 }
371
372
373 function readableStreamReaderGenericCancel(reader, reason)
374 {
375     "use strict";
376
377     const stream = @getByIdDirectPrivate(reader, "ownerReadableStream");
378     @assert(!!stream);
379     return @readableStreamCancel(stream, reason);
380 }
381
382 function readableStreamCancel(stream, reason)
383 {
384     "use strict";
385
386     @putByIdDirectPrivate(stream, "disturbed", true);
387     const state = @getByIdDirectPrivate(stream, "state");
388     if (state === @streamClosed)
389         return @Promise.@resolve();
390     if (state === @streamErrored)
391         return @Promise.@reject(@getByIdDirectPrivate(stream, "storedError"));
392     @readableStreamClose(stream);
393     return @getByIdDirectPrivate(stream, "readableStreamController").@cancel(@getByIdDirectPrivate(stream, "readableStreamController"), reason).@then(function() {  });
394 }
395
396 function readableStreamDefaultControllerCancel(controller, reason)
397 {
398     "use strict";
399
400     @putByIdDirectPrivate(controller, "queue", @newQueue());
401     return @promiseInvokeOrNoop(@getByIdDirectPrivate(controller, "underlyingSource"), "cancel", [reason]);
402 }
403
404 function readableStreamDefaultControllerPull(controller)
405 {
406     "use strict";
407
408     const stream = @getByIdDirectPrivate(controller, "controlledReadableStream");
409     if (@getByIdDirectPrivate(controller, "queue").content.length) {
410         const chunk = @dequeueValue(@getByIdDirectPrivate(controller, "queue"));
411         if (@getByIdDirectPrivate(controller, "closeRequested") && @getByIdDirectPrivate(controller, "queue").content.length === 0)
412             @readableStreamClose(stream);
413         else
414             @readableStreamDefaultControllerCallPullIfNeeded(controller);
415         return @Promise.@resolve({value: chunk, done: false});
416     }
417     const pendingPromise = @readableStreamAddReadRequest(stream);
418     @readableStreamDefaultControllerCallPullIfNeeded(controller);
419     return pendingPromise;
420 }
421
422 function readableStreamDefaultControllerClose(controller)
423 {
424     "use strict";
425
426     @assert(@readableStreamDefaultControllerCanCloseOrEnqueue(controller));
427     @putByIdDirectPrivate(controller, "closeRequested", true);
428     if (@getByIdDirectPrivate(controller, "queue").content.length === 0)
429         @readableStreamClose(@getByIdDirectPrivate(controller, "controlledReadableStream"));
430 }
431
432 function readableStreamClose(stream)
433 {
434     "use strict";
435
436     @assert(@getByIdDirectPrivate(stream, "state") === @streamReadable);
437     @putByIdDirectPrivate(stream, "state", @streamClosed);
438     const reader = @getByIdDirectPrivate(stream, "reader");
439
440     if (!reader)
441         return;
442
443     if (@isReadableStreamDefaultReader(reader)) {
444         const requests = @getByIdDirectPrivate(reader, "readRequests");
445         for (let index = 0, length = requests.length; index < length; ++index)
446             requests[index].@resolve.@call(@undefined, {value:@undefined, done: true});
447         @putByIdDirectPrivate(reader, "readRequests", []);
448     }
449
450     @getByIdDirectPrivate(reader, "closedPromiseCapability").@resolve.@call();
451 }
452
453 function readableStreamFulfillReadRequest(stream, chunk, done)
454 {
455     "use strict";
456
457     @getByIdDirectPrivate(@getByIdDirectPrivate(stream, "reader"), "readRequests").@shift().@resolve.@call(@undefined, {value: chunk, done: done});
458 }
459
460 function readableStreamDefaultControllerEnqueue(controller, chunk)
461 {
462     "use strict";
463
464     const stream = @getByIdDirectPrivate(controller, "controlledReadableStream");
465     @assert(@readableStreamDefaultControllerCanCloseOrEnqueue(controller));
466
467     if (@isReadableStreamLocked(stream) && @getByIdDirectPrivate(@getByIdDirectPrivate(stream, "reader"), "readRequests").length) {
468         @readableStreamFulfillReadRequest(stream, chunk, false);
469         @readableStreamDefaultControllerCallPullIfNeeded(controller);
470         return;
471     }
472
473     try {
474         let chunkSize = 1;
475         if (@getByIdDirectPrivate(controller, "strategy").size !== @undefined)
476             chunkSize = @getByIdDirectPrivate(controller, "strategy").size(chunk);
477         @enqueueValueWithSize(@getByIdDirectPrivate(controller, "queue"), chunk, chunkSize);
478     }
479     catch(error) {
480         if (@getByIdDirectPrivate(stream, "state") === @streamReadable)
481             @readableStreamDefaultControllerError(controller, error);
482         throw error;
483     }
484     @readableStreamDefaultControllerCallPullIfNeeded(controller);
485 }
486
487 function readableStreamDefaultReaderRead(reader)
488 {
489     "use strict";
490
491     const stream = @getByIdDirectPrivate(reader, "ownerReadableStream");
492     @assert(!!stream);
493     const state = @getByIdDirectPrivate(stream, "state");
494
495     @putByIdDirectPrivate(stream, "disturbed", true);
496     if (state === @streamClosed)
497         return @Promise.@resolve({value: @undefined, done: true});
498     if (state === @streamErrored)
499         return @Promise.@reject(@getByIdDirectPrivate(stream, "storedError"));
500     @assert(state === @streamReadable);
501
502     return @getByIdDirectPrivate(stream, "readableStreamController").@pull(@getByIdDirectPrivate(stream, "readableStreamController"));
503 }
504
505 function readableStreamAddReadRequest(stream)
506 {
507     "use strict";
508
509     @assert(@isReadableStreamDefaultReader(@getByIdDirectPrivate(stream, "reader")));
510     @assert(@getByIdDirectPrivate(stream, "state") == @streamReadable);
511
512     const readRequest = @newPromiseCapability(@Promise);
513     @getByIdDirectPrivate(@getByIdDirectPrivate(stream, "reader"), "readRequests").@push(readRequest);
514
515     return readRequest.@promise;
516 }
517
518 function isReadableStreamDisturbed(stream)
519 {
520     "use strict";
521
522     @assert(@isReadableStream(stream));
523     return @getByIdDirectPrivate(stream, "disturbed");
524 }
525
526 function readableStreamReaderGenericRelease(reader)
527 {
528     "use strict";
529
530     @assert(!!@getByIdDirectPrivate(reader, "ownerReadableStream"));
531     @assert(@getByIdDirectPrivate(@getByIdDirectPrivate(reader, "ownerReadableStream"), "reader") === reader);
532
533     if (@getByIdDirectPrivate(@getByIdDirectPrivate(reader, "ownerReadableStream"), "state") === @streamReadable)
534         @getByIdDirectPrivate(reader, "closedPromiseCapability").@reject.@call(@undefined, new @TypeError("releasing lock of reader whose stream is still in readable state"));
535     else
536         @putByIdDirectPrivate(reader, "closedPromiseCapability", { @promise: @newHandledRejectedPromise(new @TypeError("reader released lock")) });
537
538     @putByIdDirectPrivate(@getByIdDirectPrivate(reader, "closedPromiseCapability").@promise, "promiseIsHandled", true);
539     @putByIdDirectPrivate(@getByIdDirectPrivate(reader, "ownerReadableStream"), "reader", @undefined);
540     @putByIdDirectPrivate(reader, "ownerReadableStream", @undefined);
541 }
542
543 function readableStreamDefaultControllerCanCloseOrEnqueue(controller)
544 {
545     "use strict";
546
547     return !@getByIdDirectPrivate(controller, "closeRequested") && @getByIdDirectPrivate(@getByIdDirectPrivate(controller, "controlledReadableStream"), "state") === @streamReadable;
548 }