000c0a6d2ea531594fe6249adec59ebb9fefb8e6
[WebKit-https.git] / Source / WebCore / Modules / streams / ReadableStreamInternals.js
1 /*
2  * Copyright (C) 2015 Canon Inc. All rights reserved.
3  * Copyright (C) 2015 Igalia.
4  *
5  * Redistribution and use in source and binary forms, with or without
6  * modification, are permitted provided that the following conditions
7  * are met:
8  * 1. Redistributions of source code must retain the above copyright
9  *    notice, this list of conditions and the following disclaimer.
10  * 2. Redistributions in binary form must reproduce the above copyright
11  *    notice, this list of conditions and the following disclaimer in the
12  *    documentation and/or other materials provided with the distribution.
13  *
14  * THIS SOFTWARE IS PROVIDED BY APPLE INC. ``AS IS'' AND ANY
15  * EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
16  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
17  * PURPOSE ARE DISCLAIMED.  IN NO EVENT SHALL APPLE INC. OR
18  * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
19  * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
20  * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
21  * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY
22  * OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
23  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
24  * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
25  */
26
27 // @conditional=ENABLE(STREAMS_API)
28 // @internals
29
30 function privateInitializeReadableStreamReader(stream)
31 {
32     "use strict";
33
34     if (!@isReadableStream(stream))
35        throw new @TypeError("ReadableStreamReader needs a ReadableStream");
36     if (@isReadableStreamLocked(stream))
37        throw new @TypeError("ReadableStream is locked");
38
39     this.@state = stream.@state;
40     this.@readRequests = [];
41     if (stream.@state === @streamReadable) {
42         this.@ownerReadableStream = stream;
43         this.@storedError = undefined;
44         stream.@reader = this;
45         this.@closedPromiseCapability = @newPromiseCapability(@Promise);
46         return this;
47     }
48     if (stream.@state === @streamClosed) {
49         this.@ownerReadableStream = null;
50         this.@storedError = undefined;
51         this.@closedPromiseCapability = { @promise: @Promise.@resolve() };
52         return this;
53     }
54     // FIXME: ASSERT(stream.@state === @streamErrored);
55     this.@ownerReadableStream = null;
56     this.@storedError = stream.@storedError;
57     this.@closedPromiseCapability = { @promise: @Promise.@reject(stream.@storedError) };
58
59     return this;
60 }
61
62 function privateInitializeReadableStreamController(stream)
63 {
64     "use strict";
65
66     if (!@isReadableStream(stream))
67         throw new @TypeError("ReadableStreamController needs a ReadableStream");
68     if (typeof stream.@controller !== "undefined")
69         throw new @TypeError("ReadableStream already has a controller");
70     this.@controlledReadableStream = stream;
71
72     return this;
73 }
74
75 function teeReadableStream(stream, shouldClone)
76 {
77     "use strict";
78
79     // TODO: Assert: IsReadableStream(stream) is true.
80     // TODO: Assert: Type(shouldClone) is Boolean.
81
82     let reader = stream.getReader();
83
84     let teeState = {
85         closedOrErrored: false,
86         canceled1: false,
87         canceled2: false,
88         reason1: undefined,
89         reason: undefined,
90     };
91
92     teeState.cancelPromiseCapability = @newPromiseCapability(@InternalPromise);
93
94     let pullFunction = @teeReadableStreamPullFunction(teeState, reader, shouldClone);
95
96     let underlyingSource1 = {
97         "pull": pullFunction,
98         "cancel": @teeReadableStreamBranch1CancelFunction(teeState, stream)
99     };
100
101     let underlyingSource2 = {
102         "pull": pullFunction,
103         "cancel": @teeReadableStreamBranch2CancelFunction(teeState, stream)
104     };
105
106     let branch1 = new ReadableStream(underlyingSource1);
107     let branch2 = new ReadableStream(underlyingSource2);
108
109     reader.closed.catch(function(e) {
110         if (teeState.closedOrErrored)
111             return;
112         @errorReadableStream(branch1, e);
113         @errorReadableStream(branch2, e);
114         teeState.closedOrErrored = true;
115     });
116
117     // Additional fields compared to the spec, as they are needed within pull/cancel functions.
118     teeState.branch1 = branch1;
119     teeState.branch2 = branch2;
120
121     return [branch1, branch2];
122 }
123
124 function teeReadableStreamPullFunction(teeState, reader, shouldClone)
125 {
126     return function() {
127         reader.read().then(function(result) {
128             if (result.done && !teeState.closedOrErrored) {
129                 @closeReadableStream(teeState.branch1);
130                 @closeReadableStream(teeState.branch2);
131                 teeState.closedOrErrored = true;
132             }
133             if (teeState.closedOrErrored)
134                 return;
135             if (!teeState.canceled1) {
136                 // TODO: Implement cloning if shouldClone is true
137                 @enqueueInReadableStream(teeState.branch1, result.value);
138             }
139             if (!teeState.canceled2) {
140                 // TODO: Implement cloning if shouldClone is true
141                 @enqueueInReadableStream(teeState.branch2, result.value);
142             }
143         });
144     }
145 }
146
147 function teeReadableStreamBranch1CancelFunction(teeState, stream)
148 {
149     return function(r) {
150         teeState.canceled1 = true;
151         teeState.reason1 = r;
152         if (teeState.canceled2) {
153             @cancelReadableStream(stream, [teeState.reason1, teeState.reason2]).then(teeState.cancelPromiseCapability.@resolve,
154                                                                                      teeState.cancelPromiseCapability.@reject);
155         }
156         return teeState.cancelPromiseCapability.@promise;
157     }
158 }
159
160 function teeReadableStreamBranch2CancelFunction(teeState, stream)
161 {
162     return function(r) {
163         teeState.canceled2 = true;
164         teeState.reason2 = r;
165         if (teeState.canceled1) {
166             @cancelReadableStream(stream, [teeState.reason1, teeState.reason2]).then(teeState.cancelPromiseCapability.@resolve,
167                                                                                      teeState.cancelPromiseCapability.@reject);
168         }
169         return teeState.cancelPromiseCapability.@promise;
170     }
171 }
172
173 function isReadableStream(stream)
174 {
175     "use strict";
176
177     return @isObject(stream) && !!stream.@underlyingSource;
178 }
179
180 function isReadableStreamReader(reader)
181 {
182     "use strict";
183
184     return @isObject(reader) && typeof reader.@ownerReadableStream !== "undefined";
185 }
186
187 function isReadableStreamController(controller)
188 {
189     "use strict";
190
191     return @isObject(controller) && !!controller.@controlledReadableStream;
192 }
193
194 function errorReadableStream(stream, error)
195 {
196     "use strict";
197
198     // FIXME: ASSERT(stream.@state === @streamReadable);
199     stream.@queue = @newQueue();
200     stream.@storedError = error;
201     stream.@state = @streamErrored;
202
203     if (!stream.@reader)
204         return;
205     var reader = stream.@reader;
206
207     var requests = reader.@readRequests;
208     for (var index = 0, length = requests.length; index < length; ++index)
209         requests[index].@reject.@call(undefined, error);
210     reader.@readRequests = [];
211
212     @releaseReadableStreamReader(reader);
213     reader.@storedError = error;
214     reader.@state = @streamErrored;
215
216     reader.@closedPromiseCapability.@reject.@call(undefined, error);
217 }
218
219 function requestReadableStreamPull(stream)
220 {
221     "use strict";
222
223     if (stream.@state !== @streamReadable)
224         return;
225     if (stream.@closeRequested)
226         return;
227     if (!stream.@started)
228         return;
229     if ((!@isReadableStreamLocked(stream) || !stream.@reader.@readRequests.length) && @getReadableStreamDesiredSize(stream) <= 0)
230         return;
231  
232     if (stream.@pulling) {
233         stream.@pullAgain = true;
234         return;
235     }
236
237     stream.@pulling = true;
238
239     var promise = @promiseInvokeOrNoop(stream.@underlyingSource, "pull", [stream.@controller]);
240     promise.then(function() {
241         stream.@pulling = false;
242         if (stream.@pullAgain) {
243             stream.@pullAgain = false;
244             @requestReadableStreamPull(stream);
245         }
246     }, function(error) {
247         @errorReadableStream(stream, error);
248     });
249 }
250
251 function isReadableStreamLocked(stream)
252 {
253    "use strict";
254
255     return !!stream.@reader;
256 }
257
258 function getReadableStreamDesiredSize(stream)
259 {
260    "use strict";
261
262    return stream.@strategy.highWaterMark - stream.@queue.size;
263 }
264
265 function releaseReadableStreamReader(reader)
266 {
267     "use strict";
268
269     reader.@ownerReadableStream.@reader = undefined;
270     reader.@ownerReadableStream = null;
271 }
272
273 function cancelReadableStream(stream, reason)
274 {
275     "use strict";
276
277     if (stream.@state === @streamClosed)
278         return @Promise.@resolve();
279     if (stream.@state === @streamErrored)
280         return @Promise.@reject(stream.@storedError);
281     stream.@queue = @newQueue();
282     @finishClosingReadableStream(stream);
283     return @promiseInvokeOrNoop(stream.@underlyingSource, "cancel", [reason]).then(function() { });
284 }
285
286 function finishClosingReadableStream(stream)
287 {
288     "use strict";
289
290     // FIXME: ASSERT(stream.@state ===  @streamReadable);
291     stream.@state = @streamClosed;
292     var reader = stream.@reader;
293     if (reader)
294         @closeReadableStreamReader(reader);
295 }
296
297 function closeReadableStream(stream)
298 {
299     "use strict";
300
301     // FIXME: ASSERT(!stream.@closeRequested);
302     // FIXME: ASSERT(stream.@state !== @streamErrored);
303     if (stream.@state === @streamClosed)
304         return; 
305     stream.@closeRequested = true;
306     if (!stream.@queue.content.length)
307         @finishClosingReadableStream(stream);
308 }
309
310 function closeReadableStreamReader(reader)
311 {
312     "use strict";
313
314     var requests = reader.@readRequests;
315     for (var index = 0, length = requests.length; index < length; ++index)
316         requests[index].@resolve.@call(undefined, {value:undefined, done: true});
317     reader.@readRequests = [];
318     @releaseReadableStreamReader(reader);
319     reader.@state = @streamClosed;
320     reader.@closedPromiseCapability.@resolve.@call(undefined);
321 }
322
323 function enqueueInReadableStream(stream, chunk)
324 {
325     "use strict";
326
327     // FIXME: ASSERT(!stream.@closeRequested);
328     // FIXME: ASSERT(stream.@state !== @streamErrored);
329     if (stream.@state === @streamClosed)
330         return undefined;
331     if (@isReadableStreamLocked(stream) && stream.@reader.@readRequests.length) {
332         stream.@reader.@readRequests.shift().@resolve.@call(undefined, {value: chunk, done: false});
333         @requestReadableStreamPull(stream);
334         return;
335     }
336     try {
337         var size = 1;
338         if (stream.@strategy.size) {
339             size = Number(stream.@strategy.size(chunk));
340             if (Number.isNaN(size) || size === +Infinity || size < 0)
341                 throw new RangeError("Chunk size is not valid");
342         }
343         @enqueueValueWithSize(stream.@queue, chunk, size);
344     }
345     catch(error) {
346         @errorReadableStream(stream, error);
347         throw error;
348     }
349     @requestReadableStreamPull(stream);
350 }
351
352 function readFromReadableStreamReader(reader)
353 {
354     "use strict";
355
356     if (reader.@state === @streamClosed)
357         return @Promise.@resolve({value: undefined, done: true});
358     if (reader.@state === @streamErrored)
359         return @Promise.@reject(reader.@storedError);
360     // FIXME: ASSERT(!!reader.@ownerReadableStream);
361     // FIXME: ASSERT(reader.@ownerReadableStream.@state === @streamReadable);
362     var stream = reader.@ownerReadableStream;
363     if (stream.@queue.content.length) {
364         var chunk = @dequeueValue(stream.@queue);
365         if (!stream.@closeRequested)
366             @requestReadableStreamPull(stream);
367         else if (!stream.@queue.content.length)
368             @finishClosingReadableStream(stream);
369         return @Promise.@resolve({value: chunk, done: false});
370     }
371     var readPromiseCapability = @newPromiseCapability(@Promise);
372     reader.@readRequests.push(readPromiseCapability);
373     @requestReadableStreamPull(stream);
374     return readPromiseCapability.@promise;
375 }