[Readable Streams API] Implement ReadableStreamBYOBRequest respond() (readable stream...
[WebKit-https.git] / Source / WebCore / Modules / streams / ReadableByteStreamInternals.js
1 /*
2  * Copyright (C) 2016 Canon Inc. All rights reserved.
3  *
4  * Redistribution and use in source and binary forms, with or without
5  * modification, are permitted provided that the following conditions
6  * are met:
7  * 1. Redistributions of source code must retain the above copyright
8  *    notice, this list of conditions and the following disclaimer.
9  * 2. Redistributions in binary form must reproduce the above copyright
10  *    notice, this list of conditions and the following disclaimer in the
11  *    documentation and/or other materials provided with the distribution.
12  *
13  * THIS SOFTWARE IS PROVIDED BY APPLE INC. ``AS IS'' AND ANY
14  * EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
15  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
16  * PURPOSE ARE DISCLAIMED.  IN NO EVENT SHALL APPLE INC. OR
17  * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
18  * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
19  * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
20  * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY
21  * OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
22  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
23  * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
24  */
25
26 // @conditional=ENABLE(READABLE_STREAM_API) && ENABLE(READABLE_BYTE_STREAM_API)
27 // @internal
28
29 function privateInitializeReadableByteStreamController(stream, underlyingByteSource, highWaterMark)
30 {
31     "use strict";
32
33     if (!@isReadableStream(stream))
34         @throwTypeError("ReadableByteStreamController needs a ReadableStream");
35
36     // readableStreamController is initialized with null value.
37     if (stream.@readableStreamController !== null)
38         @throwTypeError("ReadableStream already has a controller");
39
40     this.@controlledReadableStream = stream;
41     this.@underlyingByteSource = underlyingByteSource;
42     this.@pullAgain = false;
43     this.@pulling = false;
44     @readableByteStreamControllerClearPendingPullIntos(this);
45     this.@queue = [];
46     this.@totalQueuedBytes = 0;
47     this.@started = false;
48     this.@closeRequested = false;
49
50     let hwm = @Number(highWaterMark);
51     if (@isNaN(hwm) || hwm < 0)
52         @throwRangeError("highWaterMark value is negative or not a number");
53     this.@strategyHWM = hwm;
54
55     let autoAllocateChunkSize = underlyingByteSource.autoAllocateChunkSize;
56     if (autoAllocateChunkSize !== @undefined) {
57         autoAllocateChunkSize = @Number(autoAllocateChunkSize);
58         if (autoAllocateChunkSize <= 0 || autoAllocateChunkSize === @Number.POSITIVE_INFINITY || autoAllocateChunkSize === @Number.NEGATIVE_INFINITY)
59             @throwRangeError("autoAllocateChunkSize value is negative or equal to positive or negative infinity");
60     }
61     this.@autoAllocateChunkSize = autoAllocateChunkSize;
62     this.@pendingPullIntos = [];
63
64     const controller = this;
65     const startResult = @promiseInvokeOrNoopNoCatch(underlyingByteSource, "start", [this]).@then(() => {
66         controller.@started = true;
67         @assert(!controller.@pulling);
68         @assert(!controller.@pullAgain);
69         @readableByteStreamControllerCallPullIfNeeded(controller);
70     }, (error) => {
71         if (stream.@state === @streamReadable)
72             @readableByteStreamControllerError(controller, error);
73     });
74
75     this.@cancel = @readableByteStreamControllerCancel;
76     this.@pull = @readableByteStreamControllerPull;
77
78     return this;
79 }
80
81 function privateInitializeReadableStreamBYOBRequest(controller, view)
82 {
83     "use strict";
84
85     this.@associatedReadableByteStreamController = controller;
86     this.@view = view;
87 }
88
89 function isReadableByteStreamController(controller)
90 {
91     "use strict";
92
93     // Same test mechanism as in isReadableStreamDefaultController (ReadableStreamInternals.js).
94     // See corresponding function for explanations.
95     return @isObject(controller) && !!controller.@underlyingByteSource;
96 }
97
98 function isReadableStreamBYOBRequest(byobRequest)
99 {
100     "use strict";
101
102     // Same test mechanism as in isReadableStreamDefaultController (ReadableStreamInternals.js).
103     // See corresponding function for explanations.
104     return @isObject(byobRequest) && !!byobRequest.@associatedReadableByteStreamController;
105 }
106
107 function isReadableStreamBYOBReader(reader)
108 {
109     "use strict";
110
111     // FIXME: Since BYOBReader is not yet implemented, always return false.
112     // To be implemented at the same time as BYOBReader (see isReadableStreamDefaultReader
113     // to apply same model).
114     return false;
115 }
116
117 function readableByteStreamControllerCancel(controller, reason)
118 {
119     "use strict";
120
121     if (controller.@pendingPullIntos.length > 0)
122         controller.@pendingPullIntos[0].bytesFilled = 0;
123     controller.@queue = [];
124     controller.@totalQueuedBytes = 0;
125     return @promiseInvokeOrNoop(controller.@underlyingByteSource, "cancel", [reason]);
126 }
127
128 function readableByteStreamControllerError(controller, e)
129 {
130     "use strict";
131
132     @assert(controller.@controlledReadableStream.@state === @streamReadable);
133     @readableByteStreamControllerClearPendingPullIntos(controller);
134     controller.@queue = [];
135     @readableStreamError(controller.@controlledReadableStream, e);
136 }
137
138 function readableByteStreamControllerClose(controller)
139 {
140     "use strict";
141
142     @assert(!controller.@closeRequested);
143     @assert(controller.@controlledReadableStream.@state === @streamReadable);
144
145     if (controller.@totalQueuedBytes > 0) {
146         controller.@closeRequested = true;
147         return;
148     }
149
150     if (controller.@pendingPullIntos.length > 0) {
151         if (controller.@pendingPullIntos[0].bytesFilled > 0) {
152             const e = new @TypeError("Close requested while there remain pending bytes");
153             @readableByteStreamControllerError(controller, e);
154             throw e;
155         }
156     }
157
158     @readableStreamClose(controller.@controlledReadableStream);
159 }
160
161 function readableByteStreamControllerClearPendingPullIntos(controller)
162 {
163     "use strict";
164
165     // FIXME: To be implemented in conjunction with ReadableStreamBYOBRequest.
166 }
167
168 function readableByteStreamControllerGetDesiredSize(controller)
169 {
170    "use strict";
171
172    return controller.@strategyHWM - controller.@totalQueuedBytes;
173 }
174
175 function readableStreamHasBYOBReader(stream)
176 {
177     "use strict";
178
179     return stream.@reader !== @undefined && @isReadableStreamBYOBReader(stream.@reader);
180 }
181
182 function readableStreamHasDefaultReader(stream)
183 {
184     "use strict";
185
186     return stream.@reader !== @undefined && @isReadableStreamDefaultReader(stream.@reader);
187 }
188
189 function readableByteStreamControllerHandleQueueDrain(controller) {
190
191     "use strict";
192
193     @assert(controller.@controlledReadableStream.@state === @streamReadable);
194     if (!controller.@totalQueuedBytes && controller.@closeRequested)
195         @readableStreamClose(controller.@controlledReadableStream);
196     else
197         @readableByteStreamControllerCallPullIfNeeded(controller);
198 }
199
200 function readableByteStreamControllerPull(controller)
201 {
202     "use strict";
203
204     const stream = controller.@controlledReadableStream;
205     @assert(@readableStreamHasDefaultReader(stream));
206
207     if (controller.@totalQueuedBytes > 0) {
208         @assert(stream.@reader.@readRequests.length === 0);
209         const entry = controller.@queue.@shift();
210         controller.@totalQueuedBytes -= entry.byteLength;
211         @readableByteStreamControllerHandleQueueDrain(controller);
212         let view;
213         try {
214             view = new @Uint8Array(entry.buffer, entry.byteOffset, entry.byteLength);
215         } catch (error) {
216             return @Promise.@reject(error);
217         }
218         return @Promise.@resolve({value: view, done: false});
219     }
220
221     if (controller.@autoAllocateChunkSize !== @undefined) {
222         let buffer;
223         try {
224             buffer = new @ArrayBuffer(controller.@autoAllocateChunkSize);
225         } catch (error) {
226             return @Promise.@reject(error);
227         }
228         const pullIntoDescriptor = {
229             buffer,
230             byteOffset: 0,
231             byteLength: controller.@autoAllocateChunkSize,
232             bytesFilled: 0,
233             elementSize: 1,
234             ctor: @Uint8Array,
235             readerType: 'default'
236         };
237         controller.@pendingPullIntos.@push(pullIntoDescriptor);
238     }
239
240     const promise = @readableStreamAddReadRequest(stream);
241     @readableByteStreamControllerCallPullIfNeeded(controller);
242     return promise;
243 }
244
245 function readableByteStreamControllerShouldCallPull(controller)
246 {
247     "use strict";
248
249     const stream = controller.@controlledReadableStream;
250
251     if (stream.@state !== @streamReadable)
252         return false;
253     if (controller.@closeRequested)
254         return false;
255     if (!controller.@started)
256         return false;
257     if (@readableStreamHasDefaultReader(stream) && stream.@reader.@readRequests.length > 0)
258         return true;
259     if (@readableStreamHasBYOBReader(stream) && stream.@reader.@readIntoRequests.length > 0)
260         return true;
261     if (@readableByteStreamControllerGetDesiredSize(controller) > 0)
262         return true;
263     return false;
264 }
265
266 function readableByteStreamControllerCallPullIfNeeded(controller)
267 {
268     "use strict";
269
270     if (!@readableByteStreamControllerShouldCallPull(controller))
271         return;
272
273     if (controller.@pulling) {
274         controller.@pullAgain = true;
275         return;
276     }
277
278     @assert(!controller.@pullAgain);
279     controller.@pulling = true;
280     @promiseInvokeOrNoop(controller.@underlyingByteSource, "pull", [controller]).@then(() => {
281         controller.@pulling = false;
282         if (controller.@pullAgain) {
283             controller.@pullAgain = false;
284             @readableByteStreamControllerCallPullIfNeeded(controller);
285         }
286     }, (error) => {
287         if (controller.@controlledReadableStream.@state === @streamReadable)
288             @readableByteStreamControllerError(controller, error);
289     });
290 }
291
292 function transferBufferToCurrentRealm(buffer)
293 {
294     "use strict";
295
296     // FIXME: Determine what should be done here exactly (what is already existing in current
297     // codebase and what has to be added). According to spec, Transfer operation should be
298     // performed in order to transfer buffer to current realm. For the moment, simply return
299     // received buffer.
300     return buffer;
301 }
302
303 function readableByteStreamControllerEnqueue(controller, chunk)
304 {
305     "use strict";
306
307     const stream = controller.@controlledReadableStream;
308     @assert(!controller.@closeRequested);
309     @assert(stream.@state === @streamReadable);
310     const buffer = chunk.buffer;
311     const byteOffset = chunk.byteOffset;
312     const byteLength = chunk.byteLength;
313     const transferredBuffer = @transferBufferToCurrentRealm(buffer);
314
315     if (@readableStreamHasDefaultReader(stream)) {
316         if (!stream.@reader.@readRequests.length)
317             @readableByteStreamControllerEnqueueChunk(controller, transferredBuffer, byteOffset, byteLength);
318         else {
319             @assert(!controller.@queue.length);
320             let transferredView = new @Uint8Array(transferredBuffer, byteOffset, byteLength);
321             @readableStreamFulfillReadRequest(stream, transferredView, false);
322         }
323         return;
324     }
325
326     if (@readableStreamHasBYOBReader(stream)) {
327         // FIXME: To be implemented once ReadableStreamBYOBReader has been implemented (for the moment,
328         // test cannot be true).
329         @throwTypeError("ReadableByteStreamController enqueue operation has no support for BYOB reader");
330         return;
331     }
332
333     @assert(!@isReadableStreamLocked(stream));
334     @readableByteStreamControllerEnqueueChunk(controller, transferredBuffer, byteOffset, byteLength);
335 }
336
337 // Spec name: readableByteStreamControllerEnqueueChunkToQueue.
338 function readableByteStreamControllerEnqueueChunk(controller, buffer, byteOffset, byteLength)
339 {
340     "use strict";
341
342     controller.@queue.@push({
343         buffer: buffer,
344         byteOffset: byteOffset,
345         byteLength: byteLength
346     });
347     controller.@totalQueuedBytes += byteLength;
348 }
349
350 function readableByteStreamControllerRespond(controller, bytesWritten)
351 {
352     "use strict";
353
354     bytesWritten = @Number(bytesWritten);
355
356     if (@isNaN(bytesWritten) || bytesWritten === @Number.POSITIVE_INFINITY || bytesWritten < 0 )
357         @throwRangeError("bytesWritten has an incorrect value");
358
359     @assert(controller.@pendingPullIntos.length > 0);
360
361     @readableByteStreamControllerRespondInternal(controller, bytesWritten);
362 }
363
364 function readableByteStreamControllerRespondInternal(controller, bytesWritten)
365 {
366     "use strict";
367
368     let firstDescriptor = controller.@pendingPullIntos[0];
369     let stream = controller.@controlledReadableStream;
370
371     if (stream.@state === @streamClosed) {
372         if (bytesWritten !== 0)
373             @throwTypeError("bytesWritten is different from 0 even though stream is closed");
374         @readableByteStreamControllerRespondInClosedState(controller, firstDescriptor);
375     } else {
376         @assert(stream.@state === @streamReadable);
377         @readableByteStreamControllerRespondInReadableState(controller, bytesWritten, firstDescriptor);
378     }
379 }
380
381 function cloneArrayBuffer(srcBuffer, srcByteOffset, srcLength)
382 {
383     "use strict";
384
385     // FIXME: Below implementation returns the appropriate data but does not perform
386     // exactly what is described by ECMAScript CloneArrayBuffer operation. This should
387     // be fixed in a follow up patch implementing cloneArrayBuffer in JSC (similarly to
388     // structuredCloneArrayBuffer implementation).
389     return srcBuffer.slice(srcByteOffset, srcByteOffset + srcLength);
390 }
391
392 function readableByteStreamControllerRespondInReadableState(controller, bytesWritten, pullIntoDescriptor)
393 {
394     "use strict";
395
396     if (pullIntoDescriptor.bytesFilled + bytesWritten > pullIntoDescriptor.byteLength)
397         @throwRangeError("bytesWritten value is too great");
398
399     @assert(controller.@pendingPullIntos.length === 0 || controller.@pendingPullIntos[0] === pullIntoDescriptor);
400     @readableByteStreamControllerInvalidateBYOBRequest(controller);
401     pullIntoDescriptor.bytesFilled += bytesWritten;
402
403     if (pullIntoDescriptor.bytesFilled < pullIntoDescriptor.elementSize)
404         return;
405
406     @readableByteStreamControllerShiftPendingDescriptor(controller);
407     const remainderSize = pullIntoDescriptor.bytesFilled % pullIntoDescriptor.elementSize;
408
409     if (remainderSize > 0) {
410         const end = pullIntoDescriptor.byteOffset + pullIntoDescriptor.bytesFilled;
411         const remainder = @cloneArrayBuffer(pullIntoDescriptor.buffer, end - remainderSize, remainderSize);
412         @readableByteStreamControllerEnqueueChunk(controller, remainder, 0, remainder.byteLength);
413     }
414
415     pullIntoDescriptor.buffer = @transferBufferToCurrentRealm(pullIntoDescriptor.buffer);
416     pullIntoDescriptor.bytesFilled -= remainderSize;
417     @readableByteStreamControllerCommitDescriptor(controller.@controlledReadableStream, pullIntoDescriptor);
418     @readableByteStreamControllerProcessPullDescriptors(controller);
419 }
420
421 function readableByteStreamControllerRespondInClosedState(controller, firstDescriptor)
422 {
423     "use strict";
424
425     firstDescriptor.buffer = @transferBufferToCurrentRealm(firstDescriptor.buffer);
426     @assert(firstDescriptor.bytesFilled === 0);
427
428     // FIXME: Spec does not describe below test. However, only ReadableStreamBYOBReader has a readIntoRequests
429     // property. This issue has been reported through WHATWG/streams GitHub
430     // (https://github.com/whatwg/streams/issues/686), but no solution has been provided for the moment.
431     // Therefore, below test is added as a temporary fix.
432     if (!@isReadableStreamBYOBReader(controller.@reader))
433         return;
434
435     while (controller.@reader.@readIntoRequests.length > 0) {
436         let pullIntoDescriptor = @readableByteStreamControllerShiftPendingDescriptor(controller);
437         @readableByteStreamControllerCommitDescriptor(controller.@controlledReadableStream, pullIntoDescriptor);
438     }
439 }
440
441 // Spec name: readableByteStreamControllerProcessPullIntoDescriptorsUsingQueue (shortened for readability).
442 function readableByteStreamControllerProcessPullDescriptors(controller)
443 {
444     "use strict";
445
446     @assert(!controller.@closeRequested);
447     while (controller.@pendingPullIntos.length > 0) {
448         if (controller.@totalQueuedBytes === 0)
449             return;
450         let pullIntoDescriptor = controller.@pendingPullIntos[0];
451         if (@readableByteStreamControllerFillDescriptorFromQueue(controller, pullIntoDescriptor)) {
452             @readableByteStreamControllerShiftPendingDescriptor(controller);
453             @readableByteStreamControllerCommitDescriptor(controller.@controlledReadableStream, pullIntoDescriptor);
454         }
455     }
456 }
457
458 // Spec name: readableByteStreamControllerFillPullIntoDescriptorFromQueue (shortened for readability).
459 function readableByteStreamControllerFillDescriptorFromQueue(controller, pullIntoDescriptor)
460 {
461     "use strict";
462
463     const currentAlignedBytes = pullIntoDescriptor.bytesFilled - (pullIntoDescriptor.bytesFilled % pullIntoDescriptor.elementSize);
464     const maxBytesToCopy = controller.@totalQueuedBytes < pullIntoDescriptor.byteLength - pullIntoDescriptor.bytesFilled ?
465                 controller.@totalQueuedBytes : pullIntoDescriptor.byteLength - pullIntoDescriptor.bytesFilled;
466     const maxBytesFilled = pullIntoDescriptor.bytesFilled + maxBytesToCopy;
467     const maxAlignedBytes = maxBytesFilled - (maxBytesFilled % pullIntoDescriptor.elementSize);
468     let totalBytesToCopyRemaining = maxBytesToCopy;
469     let ready = false;
470
471     if (maxAlignedBytes > currentAlignedBytes) {
472         totalBytesToCopyRemaining = maxAlignedBytes - pullIntoDescriptor.bytesFilled;
473         ready = true;
474     }
475
476     while (totalBytesToCopyRemaining > 0) {
477         let headOfQueue = controller.@queue[0];
478         const bytesToCopy = totalBytesToCopyRemaining < headOfQueue.byteLength ? totalBytesToCopyRemaining : headOfQueue.byteLength;
479         // Copy appropriate part of pullIntoDescriptor.buffer to headOfQueue.buffer.
480         // Remark: this implementation is not completely aligned on the definition of CopyDataBlockBytes
481         // operation of ECMAScript (the case of Shared Data Block is not considered here, but it doesn't seem to be an issue).
482         let fromIndex = pullIntoDescriptor.byteOffset + pullIntoDescriptor.bytesFilled;
483         let count = bytesToCopy;
484         let toIndex = headOfQueue.byteOffset;
485         while (count > 0) {
486             headOfQueue.buffer[toIndex] = pullIntoDescriptor.buffer[fromIndex];
487             toIndex++;
488             fromIndex++;
489             count--;
490         }
491
492         if (headOfQueue.byteLength === bytesToCopy)
493             controller.@queue.@shift();
494         else {
495             headOfQueue.byteOffset += bytesToCopy;
496             headOfQueue.byteLength -= bytesToCopy;
497         }
498
499         controller.@totalQueuedBytes -= bytesToCopy;
500         @assert(controller.@pendingPullIntos.length === 0 || controller.@pendingPullIntos[0] === pullIntoDescriptor);
501         @readableByteStreamControllerInvalidateBYOBRequest(controller);
502         pullIntoDescriptor.bytesFilled += bytesToCopy;
503         totalBytesToCopyRemaining -= bytesToCopy;
504     }
505
506     if (!ready) {
507         @assert(controller.@totalQueuedBytes === 0);
508         @assert(pullIntoDescriptor.bytesFilled > 0);
509         @assert(pullIntoDescriptor.bytesFilled < pullIntoDescriptor.elementSize);
510     }
511
512     return ready;
513 }
514
515 // Spec name: readableByteStreamControllerShiftPendingPullInto (renamed for consistency).
516 function readableByteStreamControllerShiftPendingDescriptor(controller)
517 {
518     "use strict";
519
520     let descriptor = controller.@pendingPullIntos.@shift();
521     @readableByteStreamControllerInvalidateBYOBRequest(controller);
522     return descriptor;
523 }
524
525 function readableByteStreamControllerInvalidateBYOBRequest(controller)
526 {
527     "use strict";
528
529     if (controller.@byobRequest === @undefined)
530         return;
531     controller.@byobRequest.@associatedReadableByteStreamController = @undefined;
532     controller.@byobRequest.@view = @undefined;
533     controller.@byobRequest = @undefined;
534 }
535
536 // Spec name: readableByteStreamControllerCommitPullIntoDescriptor (shortened for readability).
537 function readableByteStreamControllerCommitDescriptor(stream, pullIntoDescriptor)
538 {
539     "use strict";
540
541     @assert(stream.@state !== @streamErrored);
542     let done = false;
543     if (stream.@state === @streamClosed) {
544         @assert(!pullIntoDescriptor.bytesFilled);
545         done = true;
546     }
547     let filledView = @readableByteStreamControllerConvertDescriptor(pullIntoDescriptor);
548     if (pullIntoDescriptor.readerType === "default")
549         @readableStreamFulfillReadRequest(stream, filledView, done);
550     else {
551         @assert(pullIntoDescriptor.readerType === "byob");
552         @readableStreamFulfillReadIntoRequest(stream, filledView, done);
553     }
554 }
555
556 // Spec name: readableByteStreamControllerConvertPullIntoDescriptor (shortened for readability).
557 function readableByteStreamControllerConvertDescriptor(pullIntoDescriptor)
558 {
559     "use strict";
560
561     @assert(pullIntoDescriptor.bytesFilled <= pullIntoDescriptor.byteLength);
562     @assert(pullIntoDescriptor.bytesFilled % pullIntoDescriptor.elementSize === 0);
563
564     return new pullIntoDescriptor.ctor(pullIntoDescriptor.buffer, pullIntoDescriptor.byteOffset, pullIntoDescriptor.bytesFilled / pullIntoDescriptor.elementSize);
565 }
566
567 function readableStreamFulfillReadIntoRequest(stream, chunk, done)
568 {
569     "use strict";
570
571     stream.@reader.@readIntoRequests.@shift().@resolve.@call(@undefined, {value: chunk, done: done});
572 }