44367c13bc505a6da277a039795908fd82707494
[WebKit-https.git] / Source / WebCore / Modules / streams / ReadableByteStreamInternals.js
1 /*
2  * Copyright (C) 2016 Canon Inc. All rights reserved.
3  *
4  * Redistribution and use in source and binary forms, with or without
5  * modification, are permitted provided that the following conditions
6  * are met:
7  * 1. Redistributions of source code must retain the above copyright
8  *    notice, this list of conditions and the following disclaimer.
9  * 2. Redistributions in binary form must reproduce the above copyright
10  *    notice, this list of conditions and the following disclaimer in the
11  *    documentation and/or other materials provided with the distribution.
12  *
13  * THIS SOFTWARE IS PROVIDED BY APPLE INC. ``AS IS'' AND ANY
14  * EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
15  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
16  * PURPOSE ARE DISCLAIMED.  IN NO EVENT SHALL APPLE INC. OR
17  * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
18  * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
19  * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
20  * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY
21  * OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
22  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
23  * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
24  */
25
26 // @conditional=ENABLE(READABLE_STREAM_API) && ENABLE(READABLE_BYTE_STREAM_API)
27 // @internal
28
29 function privateInitializeReadableByteStreamController(stream, underlyingByteSource, highWaterMark)
30 {
31     "use strict";
32
33     if (!@isReadableStream(stream))
34         @throwTypeError("ReadableByteStreamController needs a ReadableStream");
35
36     // readableStreamController is initialized with null value.
37     if (stream.@readableStreamController !== null)
38         @throwTypeError("ReadableStream already has a controller");
39
40     this.@controlledReadableStream = stream;
41     this.@underlyingByteSource = underlyingByteSource;
42     this.@pullAgain = false;
43     this.@pulling = false;
44     @readableByteStreamControllerClearPendingPullIntos(this);
45     this.@queue = [];
46     this.@totalQueuedBytes = 0;
47     this.@started = false;
48     this.@closeRequested = false;
49
50     let hwm = @Number(highWaterMark);
51     if (@isNaN(hwm) || hwm < 0)
52         @throwRangeError("highWaterMark value is negative or not a number");
53     this.@strategyHWM = hwm;
54
55     let autoAllocateChunkSize = underlyingByteSource.autoAllocateChunkSize;
56     if (autoAllocateChunkSize !== @undefined) {
57         autoAllocateChunkSize = @Number(autoAllocateChunkSize);
58         if (autoAllocateChunkSize <= 0 || autoAllocateChunkSize === @Number.POSITIVE_INFINITY || autoAllocateChunkSize === @Number.NEGATIVE_INFINITY)
59             @throwRangeError("autoAllocateChunkSize value is negative or equal to positive or negative infinity");
60     }
61     this.@autoAllocateChunkSize = autoAllocateChunkSize;
62     this.@pendingPullIntos = [];
63
64     const controller = this;
65     const startResult = @promiseInvokeOrNoopNoCatch(underlyingByteSource, "start", [this]).@then(() => {
66         controller.@started = true;
67         @assert(!controller.@pulling);
68         @assert(!controller.@pullAgain);
69         @readableByteStreamControllerCallPullIfNeeded(controller);
70     }, (error) => {
71         if (stream.@state === @streamReadable)
72             @readableByteStreamControllerError(controller, error);
73     });
74
75     this.@cancel = @readableByteStreamControllerCancel;
76     this.@pull = @readableByteStreamControllerPull;
77
78     return this;
79 }
80
81 function privateInitializeReadableStreamBYOBRequest(controller, view)
82 {
83     "use strict";
84
85     this.@associatedReadableByteStreamController = controller;
86     this.@view = view;
87 }
88
89 function isReadableByteStreamController(controller)
90 {
91     "use strict";
92
93     // Same test mechanism as in isReadableStreamDefaultController (ReadableStreamInternals.js).
94     // See corresponding function for explanations.
95     return @isObject(controller) && !!controller.@underlyingByteSource;
96 }
97
98 function isReadableStreamBYOBRequest(byobRequest)
99 {
100     "use strict";
101
102     // Same test mechanism as in isReadableStreamDefaultController (ReadableStreamInternals.js).
103     // See corresponding function for explanations.
104     return @isObject(byobRequest) && !!byobRequest.@associatedReadableByteStreamController;
105 }
106
107 function isReadableStreamBYOBReader(reader)
108 {
109     "use strict";
110
111     // FIXME: Since BYOBReader is not yet implemented, always return false.
112     // To be implemented at the same time as BYOBReader (see isReadableStreamDefaultReader
113     // to apply same model).
114     return false;
115 }
116
117 function readableByteStreamControllerCancel(controller, reason)
118 {
119     "use strict";
120
121     if (controller.@pendingPullIntos.length > 0)
122         controller.@pendingPullIntos[0].bytesFilled = 0;
123     controller.@queue = [];
124     controller.@totalQueuedBytes = 0;
125     return @promiseInvokeOrNoop(controller.@underlyingByteSource, "cancel", [reason]);
126 }
127
128 function readableByteStreamControllerError(controller, e)
129 {
130     "use strict";
131
132     @assert(controller.@controlledReadableStream.@state === @streamReadable);
133     @readableByteStreamControllerClearPendingPullIntos(controller);
134     controller.@queue = [];
135     @readableStreamError(controller.@controlledReadableStream, e);
136 }
137
138 function readableByteStreamControllerClose(controller)
139 {
140     "use strict";
141
142     @assert(!controller.@closeRequested);
143     @assert(controller.@controlledReadableStream.@state === @streamReadable);
144
145     if (controller.@totalQueuedBytes > 0) {
146         controller.@closeRequested = true;
147         return;
148     }
149
150     if (controller.@pendingPullIntos.length > 0) {
151         if (controller.@pendingPullIntos[0].bytesFilled > 0) {
152             const e = new @TypeError("Close requested while there remain pending bytes");
153             @readableByteStreamControllerError(controller, e);
154             throw e;
155         }
156     }
157
158     @readableStreamClose(controller.@controlledReadableStream);
159 }
160
161 function readableByteStreamControllerClearPendingPullIntos(controller)
162 {
163     "use strict";
164
165     // FIXME: To be implemented in conjunction with ReadableStreamBYOBRequest.
166 }
167
168 function readableByteStreamControllerGetDesiredSize(controller)
169 {
170    "use strict";
171
172    return controller.@strategyHWM - controller.@totalQueuedBytes;
173 }
174
175 function readableStreamHasBYOBReader(stream)
176 {
177     "use strict";
178
179     return stream.@reader !== @undefined && @isReadableStreamBYOBReader(stream.@reader);
180 }
181
182 function readableStreamHasDefaultReader(stream)
183 {
184     "use strict";
185
186     return stream.@reader !== @undefined && @isReadableStreamDefaultReader(stream.@reader);
187 }
188
189 function readableByteStreamControllerHandleQueueDrain(controller) {
190
191     "use strict";
192
193     @assert(controller.@controlledReadableStream.@state === @streamReadable);
194     if (!controller.@totalQueuedBytes && controller.@closeRequested)
195         @readableStreamClose(controller.@controlledReadableStream);
196     else
197         @readableByteStreamControllerCallPullIfNeeded(controller);
198 }
199
200 function readableByteStreamControllerPull(controller)
201 {
202     "use strict";
203
204     const stream = controller.@controlledReadableStream;
205     @assert(@readableStreamHasDefaultReader(stream));
206
207     if (controller.@totalQueuedBytes > 0) {
208         @assert(stream.@reader.@readRequests.length === 0);
209         const entry = controller.@queue.@shift();
210         controller.@totalQueuedBytes -= entry.byteLength;
211         @readableByteStreamControllerHandleQueueDrain(controller);
212         let view;
213         try {
214             view = new @Uint8Array(entry.buffer, entry.byteOffset, entry.byteLength);
215         } catch (error) {
216             return @Promise.@reject(error);
217         }
218         return @Promise.@resolve({value: view, done: false});
219     }
220
221     if (controller.@autoAllocateChunkSize !== @undefined) {
222         let buffer;
223         try {
224             buffer = new @ArrayBuffer(controller.@autoAllocateChunkSize);
225         } catch (error) {
226             return @Promise.@reject(error);
227         }
228         const pullIntoDescriptor = {
229             buffer,
230             byteOffset: 0,
231             byteLength: controller.@autoAllocateChunkSize,
232             bytesFilled: 0,
233             elementSize: 1,
234             ctor: @Uint8Array,
235             readerType: 'default'
236         };
237         controller.@pendingPullIntos.@push(pullIntoDescriptor);
238     }
239
240     const promise = @readableStreamAddReadRequest(stream);
241     @readableByteStreamControllerCallPullIfNeeded(controller);
242     return promise;
243 }
244
245 function readableByteStreamControllerShouldCallPull(controller)
246 {
247     "use strict";
248
249     const stream = controller.@controlledReadableStream;
250
251     if (stream.@state !== @streamReadable)
252         return false;
253     if (controller.@closeRequested)
254         return false;
255     if (!controller.@started)
256         return false;
257     if (@readableStreamHasDefaultReader(stream) && stream.@reader.@readRequests.length > 0)
258         return true;
259     if (@readableStreamHasBYOBReader(stream) && stream.@reader.@readIntoRequests.length > 0)
260         return true;
261     if (@readableByteStreamControllerGetDesiredSize(controller) > 0)
262         return true;
263     return false;
264 }
265
266 function readableByteStreamControllerCallPullIfNeeded(controller)
267 {
268     "use strict";
269
270     if (!@readableByteStreamControllerShouldCallPull(controller))
271         return;
272
273     if (controller.@pulling) {
274         controller.@pullAgain = true;
275         return;
276     }
277
278     @assert(!controller.@pullAgain);
279     controller.@pulling = true;
280     @promiseInvokeOrNoop(controller.@underlyingByteSource, "pull", [controller]).@then(() => {
281         controller.@pulling = false;
282         if (controller.@pullAgain) {
283             controller.@pullAgain = false;
284             @readableByteStreamControllerCallPullIfNeeded(controller);
285         }
286     }, (error) => {
287         if (controller.@controlledReadableStream.@state === @streamReadable)
288             @readableByteStreamControllerError(controller, error);
289     });
290 }
291
292 function transferBufferToCurrentRealm(buffer)
293 {
294     "use strict";
295
296     // FIXME: Determine what should be done here exactly (what is already existing in current
297     // codebase and what has to be added). According to spec, Transfer operation should be
298     // performed in order to transfer buffer to current realm. For the moment, simply return
299     // received buffer.
300     return buffer;
301 }
302
303 function readableByteStreamControllerEnqueue(controller, chunk)
304 {
305     "use strict";
306
307     const stream = controller.@controlledReadableStream;
308     @assert(!controller.@closeRequested);
309     @assert(stream.@state === @streamReadable);
310     const buffer = chunk.buffer;
311     const byteOffset = chunk.byteOffset;
312     const byteLength = chunk.byteLength;
313     const transferredBuffer = @transferBufferToCurrentRealm(buffer);
314
315     if (@readableStreamHasDefaultReader(stream)) {
316         if (!stream.@reader.@readRequests.length)
317             @readableByteStreamControllerEnqueueChunkToQueue(controller, transferredBuffer, byteOffset, byteLength);
318         else {
319             @assert(!controller.@queue.length);
320             let transferredView = new @Uint8Array(transferredBuffer, byteOffset, byteLength);
321             @readableStreamFulfillReadRequest(stream, transferredView, false);
322         }
323         return;
324     }
325
326     if (@readableStreamHasBYOBReader(stream)) {
327         // FIXME: To be implemented once ReadableStreamBYOBReader has been implemented (for the moment,
328         // test cannot be true).
329         @throwTypeError("ReadableByteStreamController enqueue operation has no support for BYOB reader");
330         return;
331     }
332
333     @assert(!@isReadableStreamLocked(stream));
334     @readableByteStreamControllerEnqueueChunkToQueue(controller, transferredBuffer, byteOffset, byteLength);
335 }
336
337 function readableByteStreamControllerEnqueueChunkToQueue(controller, buffer, byteOffset, byteLength)
338 {
339     "use strict";
340
341     controller.@queue.@push({
342         buffer: buffer,
343         byteOffset: byteOffset,
344         byteLength: byteLength
345     });
346     controller.@totalQueuedBytes += byteLength;
347 }
348
349 function readableByteStreamControllerRespond(controller, bytesWritten)
350 {
351     "use strict";
352
353     bytesWritten = @Number(bytesWritten);
354
355     if (@isNaN(bytesWritten) || bytesWritten === @Number.POSITIVE_INFINITY || bytesWritten < 0 )
356         @throwRangeError("bytesWritten has an incorrect value");
357
358     @assert(controller.@pendingPullIntos.length > 0);
359
360     @readableByteStreamControllerRespondInternal(controller, bytesWritten);
361 }
362
363 function readableByteStreamControllerRespondInternal(controller, bytesWritten)
364 {
365     "use strict";
366
367     let firstDescriptor = controller.@pendingPullIntos[0];
368     let stream = controller.@controlledReadableStream;
369
370     if (stream.@state === @streamClosed) {
371         if (bytesWritten !== 0)
372             @throwTypeError("bytesWritten is different from 0 even though stream is closed");
373         @readableByteStreamControllerRespondInClosedState(controller, firstDescriptor);
374     } else {
375         // FIXME: Also implement case of readable state (distinct patch to avoid adding too many different cases
376         // in a single patch).
377         @throwTypeError("Readable state is not yet supported");
378     }
379 }
380
381 function readableByteStreamControllerRespondInClosedState(controller, firstDescriptor)
382 {
383     "use strict";
384
385     firstDescriptor.buffer = @transferBufferToCurrentRealm(firstDescriptor.buffer);
386     @assert(firstDescriptor.bytesFilled === 0);
387
388     // FIXME: Spec does not describe below test. However, only ReadableStreamBYOBReader has a readIntoRequests
389     // property. This issue has been reported through WHATWG/streams GitHub
390     // (https://github.com/whatwg/streams/issues/686), but no solution has been provided for the moment.
391     // Therefore, below test is added as a temporary fix.
392     if (!@isReadableStreamBYOBReader(controller.@reader))
393         return;
394
395     while (controller.@reader.@readIntoRequests.length > 0) {
396         let pullIntoDescriptor = @readableByteStreamControllerShiftPendingPullInto(controller);
397         @readableByteStreamControllerCommitPullIntoDescriptor(controller.@controlledReadableStream, pullIntoDescriptor);
398     }
399 }
400
401 function readableByteStreamControllerShiftPendingPullInto(controller)
402 {
403     "use strict";
404
405     let descriptor = controller.@pendingPullIntos.@shift();
406     @readableByteStreamControllerInvalidateBYOBRequest(controller);
407     return descriptor;
408 }
409
410 function readableByteStreamControllerInvalidateBYOBRequest(controller)
411 {
412     "use strict";
413
414     if (controller.@byobRequest === @undefined)
415         return;
416     controller.@byobRequest.@associatedReadableByteStreamController = @undefined;
417     controller.@byobRequest.@view = @undefined;
418     controller.@byobRequest = @undefined;
419 }
420
421 function readableByteStreamControllerCommitPullIntoDescriptor(stream, pullIntoDescriptor)
422 {
423     "use strict";
424
425     @assert(stream.@state !== @streamErrored);
426     let done = false;
427     if (stream.@state === @streamClosed) {
428         @assert(!pullIntoDescriptor.bytesFilled);
429         done = true;
430     }
431     let filledView = @readableByteStreamControllerConvertPullIntoDescriptor(pullIntoDescriptor);
432     if (pullIntoDescriptor.readerType === "default")
433         @readableStreamFulfillReadRequest(stream, filledView, done);
434     else {
435         @assert(pullIntoDescriptor.readerType === "byob");
436         @readableStreamFulfillReadIntoRequest(stream, filledView, done);
437     }
438 }
439
440 function readableByteStreamControllerConvertPullIntoDescriptor(pullIntoDescriptor)
441 {
442     "use strict";
443
444     @assert(pullIntoDescriptor.bytesFilled <= pullIntoDescriptor.bytesLength);
445     @assert(pullIntoDescriptor.bytesFilled % pullIntoDescriptor.elementSize === 0);
446
447     return new pullIntoDescriptor.ctor(pullIntoDescriptor.buffer, pullIntoDescriptor.byteOffset, pullIntoDescriptor.bytesFilled / pullIntoDescriptor.elementSize);
448 }
449
450 function readableStreamFulfillReadIntoRequest(stream, chunk, done)
451 {
452     "use strict";
453
454     stream.@reader.@readIntoRequests.@shift().@resolve.@call(@undefined, {value: chunk, done: done});
455 }