[GStreamer] Adaptive streaming issues
[WebKit-https.git] / Source / WebCore / platform / graphics / gstreamer / WebKitWebSourceGStreamer.cpp
1 /*
2  *  Copyright (C) 2009, 2010 Sebastian Dröge <sebastian.droege@collabora.co.uk>
3  *  Copyright (C) 2013 Collabora Ltd.
4  *
5  *  This library is free software; you can redistribute it and/or
6  *  modify it under the terms of the GNU Lesser General Public
7  *  License as published by the Free Software Foundation; either
8  *  version 2 of the License, or (at your option) any later version.
9  *
10  *  This library is distributed in the hope that it will be useful,
11  *  but WITHOUT ANY WARRANTY; without even the implied warranty of
12  *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  *  Lesser General Public License for more details.
14  *
15  *  You should have received a copy of the GNU Lesser General Public
16  *  License along with this library; if not, write to the Free Software
17  *  Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301  USA
18  */
19
20 #include "config.h"
21 #include "WebKitWebSourceGStreamer.h"
22
23 #if ENABLE(VIDEO) && USE(GSTREAMER)
24
25 #include "GRefPtrGStreamer.h"
26 #include "GStreamerUtilities.h"
27 #include "GUniquePtrGStreamer.h"
28 #include "HTTPHeaderNames.h"
29 #include "MainThreadNotifier.h"
30 #include "MediaPlayer.h"
31 #include "NotImplemented.h"
32 #include "PlatformMediaResourceLoader.h"
33 #include "ResourceError.h"
34 #include "ResourceHandle.h"
35 #include "ResourceHandleClient.h"
36 #include "ResourceRequest.h"
37 #include "ResourceResponse.h"
38 #include "SharedBuffer.h"
39 #include <gst/app/gstappsrc.h>
40 #include <gst/gst.h>
41 #include <gst/pbutils/missing-plugins.h>
42 #include <wtf/MainThread.h>
43 #include <wtf/Noncopyable.h>
44 #include <wtf/glib/GMutexLocker.h>
45 #include <wtf/glib/GRefPtr.h>
46 #include <wtf/glib/GUniquePtr.h>
47 #include <wtf/text/CString.h>
48
49 using namespace WebCore;
50
51 class StreamingClient {
52     public:
53         StreamingClient(WebKitWebSrc*);
54         virtual ~StreamingClient();
55
56     protected:
57         char* createReadBuffer(size_t requestedSize, size_t& actualSize);
58         void handleResponseReceived(const ResourceResponse&);
59         void handleDataReceived(const char*, int);
60         void handleNotifyFinished();
61
62         GstElement* m_src;
63 };
64
65 class CachedResourceStreamingClient final : public PlatformMediaResourceClient, public StreamingClient {
66     WTF_MAKE_NONCOPYABLE(CachedResourceStreamingClient);
67     public:
68         CachedResourceStreamingClient(WebKitWebSrc*);
69         virtual ~CachedResourceStreamingClient();
70
71     private:
72         // PlatformMediaResourceClient virtual methods.
73 #if USE(SOUP)
74         char* getOrCreateReadBuffer(PlatformMediaResource&, size_t requestedSize, size_t& actualSize) override;
75 #endif
76         void responseReceived(PlatformMediaResource&, const ResourceResponse&) override;
77         void dataReceived(PlatformMediaResource&, const char*, int) override;
78         void accessControlCheckFailed(PlatformMediaResource&, const ResourceError&) override;
79         void loadFailed(PlatformMediaResource&, const ResourceError&) override;
80         void loadFinished(PlatformMediaResource&) override;
81 };
82
83 class ResourceHandleStreamingClient : public ResourceHandleClient, public StreamingClient {
84     WTF_MAKE_NONCOPYABLE(ResourceHandleStreamingClient); WTF_MAKE_FAST_ALLOCATED;
85     public:
86         ResourceHandleStreamingClient(WebKitWebSrc*, const ResourceRequest&);
87         virtual ~ResourceHandleStreamingClient();
88
89         // StreamingClient virtual methods.
90         bool loadFailed() const;
91         void setDefersLoading(bool);
92
93     private:
94         // ResourceHandleClient virtual methods.
95 #if USE(SOUP)
96         char* getOrCreateReadBuffer(size_t requestedSize, size_t& actualSize) override;
97 #endif
98         void willSendRequest(ResourceHandle*, ResourceRequest&, const ResourceResponse&) override;
99         void didReceiveResponse(ResourceHandle*, const ResourceResponse&) override;
100         void didReceiveData(ResourceHandle*, const char*, unsigned, int) override;
101         void didReceiveBuffer(ResourceHandle*, PassRefPtr<SharedBuffer>, int encodedLength) override;
102         void didFinishLoading(ResourceHandle*, double /*finishTime*/) override;
103         void didFail(ResourceHandle*, const ResourceError&) override;
104         void wasBlocked(ResourceHandle*) override;
105         void cannotShowURL(ResourceHandle*) override;
106
107         RefPtr<ResourceHandle> m_resource;
108 };
109
110 enum MainThreadSourceNotification {
111     Start = 1 << 0,
112     Stop = 1 << 1,
113     NeedData = 1 << 2,
114     EnoughData = 1 << 3,
115     Seek = 1 << 4
116 };
117
118 #define WEBKIT_WEB_SRC_GET_PRIVATE(obj) (G_TYPE_INSTANCE_GET_PRIVATE((obj), WEBKIT_TYPE_WEB_SRC, WebKitWebSrcPrivate))
119 struct _WebKitWebSrcPrivate {
120     GstAppSrc* appsrc;
121     GstPad* srcpad;
122     gchar* uri;
123     bool keepAlive;
124     GUniquePtr<GstStructure> extraHeaders;
125     bool compress;
126     GUniquePtr<gchar> httpMethod;
127
128     WebCore::MediaPlayer* player;
129
130     RefPtr<PlatformMediaResourceLoader> loader;
131     RefPtr<PlatformMediaResource> resource;
132     ResourceHandleStreamingClient* client;
133
134     bool didPassAccessControlCheck;
135
136     guint64 offset;
137     guint64 size;
138     gboolean seekable;
139     gboolean paused;
140     bool isSeeking;
141
142     guint64 requestedOffset;
143
144     MainThreadNotifier<MainThreadSourceNotification> notifier;
145     GRefPtr<GstBuffer> buffer;
146 };
147
148 enum {
149     PROP_0,
150     PROP_LOCATION,
151     PROP_KEEP_ALIVE,
152     PROP_EXTRA_HEADERS,
153     PROP_COMPRESS,
154     PROP_METHOD
155 };
156
157 static GstStaticPadTemplate srcTemplate = GST_STATIC_PAD_TEMPLATE("src",
158                                                                   GST_PAD_SRC,
159                                                                   GST_PAD_ALWAYS,
160                                                                   GST_STATIC_CAPS_ANY);
161
162 GST_DEBUG_CATEGORY_STATIC(webkit_web_src_debug);
163 #define GST_CAT_DEFAULT webkit_web_src_debug
164
165 static void webKitWebSrcUriHandlerInit(gpointer gIface, gpointer ifaceData);
166
167 static void webKitWebSrcDispose(GObject*);
168 static void webKitWebSrcFinalize(GObject*);
169 static void webKitWebSrcSetProperty(GObject*, guint propertyID, const GValue*, GParamSpec*);
170 static void webKitWebSrcGetProperty(GObject*, guint propertyID, GValue*, GParamSpec*);
171 static GstStateChangeReturn webKitWebSrcChangeState(GstElement*, GstStateChange);
172
173 static gboolean webKitWebSrcQueryWithParent(GstPad*, GstObject*, GstQuery*);
174
175 static void webKitWebSrcNeedData(WebKitWebSrc*);
176 static void webKitWebSrcEnoughData(WebKitWebSrc*);
177 static void webKitWebSrcSeek(WebKitWebSrc*);
178
179 static GstAppSrcCallbacks appsrcCallbacks = {
180     // need_data
181     [](GstAppSrc*, guint, gpointer userData) {
182         WebKitWebSrc* src = WEBKIT_WEB_SRC(userData);
183         WebKitWebSrcPrivate* priv = src->priv;
184
185         {
186             WTF::GMutexLocker<GMutex> locker(*GST_OBJECT_GET_LOCK(src));
187             if (!priv->paused)
188                 return;
189         }
190
191         GRefPtr<WebKitWebSrc> protector = WTF::ensureGRef(src);
192         priv->notifier.notify(MainThreadSourceNotification::NeedData, [protector] { webKitWebSrcNeedData(protector.get()); });
193     },
194     // enough_data
195     [](GstAppSrc*, gpointer userData) {
196         WebKitWebSrc* src = WEBKIT_WEB_SRC(userData);
197         WebKitWebSrcPrivate* priv = src->priv;
198
199         {
200             WTF::GMutexLocker<GMutex> locker(*GST_OBJECT_GET_LOCK(src));
201             if (priv->paused)
202                 return;
203         }
204
205         GRefPtr<WebKitWebSrc> protector = WTF::ensureGRef(src);
206         priv->notifier.notify(MainThreadSourceNotification::EnoughData, [protector] { webKitWebSrcEnoughData(protector.get()); });
207     },
208     // seek_data
209     [](GstAppSrc*, guint64 offset, gpointer userData) -> gboolean {
210         WebKitWebSrc* src = WEBKIT_WEB_SRC(userData);
211         WebKitWebSrcPrivate* priv = src->priv;
212
213         {
214             WTF::GMutexLocker<GMutex> locker(*GST_OBJECT_GET_LOCK(src));
215             if (offset == priv->offset && priv->requestedOffset == priv->offset)
216                 return TRUE;
217
218             if (!priv->seekable)
219                 return FALSE;
220
221             priv->isSeeking = true;
222             priv->requestedOffset = offset;
223         }
224
225         GRefPtr<WebKitWebSrc> protector = WTF::ensureGRef(src);
226         priv->notifier.notify(MainThreadSourceNotification::Seek, [protector] { webKitWebSrcSeek(protector.get()); });
227         return TRUE;
228     },
229     { nullptr }
230 };
231
232 #define webkit_web_src_parent_class parent_class
233 // We split this out into another macro to avoid a check-webkit-style error.
234 #define WEBKIT_WEB_SRC_CATEGORY_INIT GST_DEBUG_CATEGORY_INIT(webkit_web_src_debug, "webkitwebsrc", 0, "websrc element");
235 G_DEFINE_TYPE_WITH_CODE(WebKitWebSrc, webkit_web_src, GST_TYPE_BIN,
236                          G_IMPLEMENT_INTERFACE(GST_TYPE_URI_HANDLER, webKitWebSrcUriHandlerInit);
237                          WEBKIT_WEB_SRC_CATEGORY_INIT);
238
239 static void webkit_web_src_class_init(WebKitWebSrcClass* klass)
240 {
241     GObjectClass* oklass = G_OBJECT_CLASS(klass);
242     GstElementClass* eklass = GST_ELEMENT_CLASS(klass);
243
244     oklass->dispose = webKitWebSrcDispose;
245     oklass->finalize = webKitWebSrcFinalize;
246     oklass->set_property = webKitWebSrcSetProperty;
247     oklass->get_property = webKitWebSrcGetProperty;
248
249     gst_element_class_add_pad_template(eklass,
250                                        gst_static_pad_template_get(&srcTemplate));
251     gst_element_class_set_metadata(eklass, "WebKit Web source element", "Source", "Handles HTTP/HTTPS uris",
252                                "Sebastian Dröge <sebastian.droege@collabora.co.uk>");
253
254     /* Allows setting the uri using the 'location' property, which is used
255      * for example by gst_element_make_from_uri() */
256     g_object_class_install_property(oklass,
257                                     PROP_LOCATION,
258                                     g_param_spec_string("location",
259                                                         "location",
260                                                         "Location to read from",
261                                                         0,
262                                                         (GParamFlags) (G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)));
263
264     g_object_class_install_property(oklass, PROP_KEEP_ALIVE,
265         g_param_spec_boolean("keep-alive", "keep-alive", "Use HTTP persistent connections",
266             FALSE, static_cast<GParamFlags>(G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)));
267
268     g_object_class_install_property(oklass, PROP_EXTRA_HEADERS,
269         g_param_spec_boxed("extra-headers", "Extra Headers", "Extra headers to append to the HTTP request",
270             GST_TYPE_STRUCTURE, static_cast<GParamFlags>(G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)));
271
272     g_object_class_install_property(oklass, PROP_COMPRESS,
273         g_param_spec_boolean("compress", "Compress", "Allow compressed content encodings",
274             FALSE, static_cast<GParamFlags>(G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)));
275
276     g_object_class_install_property(oklass, PROP_METHOD,
277         g_param_spec_string("method", "method", "The HTTP method to use (default: GET)",
278             nullptr, static_cast<GParamFlags>(G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)));
279
280     eklass->change_state = webKitWebSrcChangeState;
281
282     g_type_class_add_private(klass, sizeof(WebKitWebSrcPrivate));
283 }
284
285 static void webkit_web_src_init(WebKitWebSrc* src)
286 {
287     WebKitWebSrcPrivate* priv = WEBKIT_WEB_SRC_GET_PRIVATE(src);
288
289     src->priv = priv;
290     new (priv) WebKitWebSrcPrivate();
291
292     priv->appsrc = GST_APP_SRC(gst_element_factory_make("appsrc", 0));
293     if (!priv->appsrc) {
294         GST_ERROR_OBJECT(src, "Failed to create appsrc");
295         return;
296     }
297
298     gst_bin_add(GST_BIN(src), GST_ELEMENT(priv->appsrc));
299
300
301     GRefPtr<GstPad> targetPad = adoptGRef(gst_element_get_static_pad(GST_ELEMENT(priv->appsrc), "src"));
302     priv->srcpad = webkitGstGhostPadFromStaticTemplate(&srcTemplate, "src", targetPad.get());
303
304     gst_element_add_pad(GST_ELEMENT(src), priv->srcpad);
305
306     GST_OBJECT_FLAG_SET(priv->srcpad, GST_PAD_FLAG_NEED_PARENT);
307     gst_pad_set_query_function(priv->srcpad, webKitWebSrcQueryWithParent);
308
309     gst_app_src_set_callbacks(priv->appsrc, &appsrcCallbacks, src, 0);
310     gst_app_src_set_emit_signals(priv->appsrc, FALSE);
311     gst_app_src_set_stream_type(priv->appsrc, GST_APP_STREAM_TYPE_SEEKABLE);
312
313     // 512k is a abitrary number but we should choose a value
314     // here to not pause/unpause the SoupMessage too often and
315     // to make sure there's always some data available for
316     // GStreamer to handle.
317     gst_app_src_set_max_bytes(priv->appsrc, 512 * 1024);
318
319     // Emit the need-data signal if the queue contains less
320     // than 20% of data. Without this the need-data signal
321     // is emitted when the queue is empty, we then dispatch
322     // the soup message unpausing to the main loop and from
323     // there unpause the soup message. This already takes
324     // quite some time and libsoup even needs some more time
325     // to actually provide data again. If we do all this
326     // already if the queue is 20% empty, it's much more
327     // likely that libsoup already provides new data before
328     // the queue is really empty.
329     // This might need tweaking for ports not using libsoup.
330     g_object_set(priv->appsrc, "min-percent", 20, NULL);
331
332     gst_app_src_set_caps(priv->appsrc, 0);
333     gst_app_src_set_size(priv->appsrc, -1);
334 }
335
336 static void webKitWebSrcDispose(GObject* object)
337 {
338     WebKitWebSrc* src = WEBKIT_WEB_SRC(object);
339     WebKitWebSrcPrivate* priv = src->priv;
340
341     priv->player = 0;
342
343     GST_CALL_PARENT(G_OBJECT_CLASS, dispose, (object));
344 }
345
346 static void webKitWebSrcFinalize(GObject* object)
347 {
348     WebKitWebSrc* src = WEBKIT_WEB_SRC(object);
349     WebKitWebSrcPrivate* priv = src->priv;
350
351     g_free(priv->uri);
352     priv->~WebKitWebSrcPrivate();
353
354     GST_CALL_PARENT(G_OBJECT_CLASS, finalize, (object));
355 }
356
357 static void webKitWebSrcSetProperty(GObject* object, guint propID, const GValue* value, GParamSpec* pspec)
358 {
359     WebKitWebSrc* src = WEBKIT_WEB_SRC(object);
360
361     switch (propID) {
362     case PROP_LOCATION:
363         gst_uri_handler_set_uri(reinterpret_cast<GstURIHandler*>(src), g_value_get_string(value), 0);
364         break;
365     case PROP_KEEP_ALIVE:
366         src->priv->keepAlive = g_value_get_boolean(value);
367         break;
368     case PROP_EXTRA_HEADERS: {
369         const GstStructure* s = gst_value_get_structure(value);
370         src->priv->extraHeaders.reset(s ? gst_structure_copy(s) : nullptr);
371         break;
372     }
373     case PROP_COMPRESS:
374         src->priv->compress = g_value_get_boolean(value);
375         break;
376     case PROP_METHOD:
377         src->priv->httpMethod.reset(g_value_dup_string(value));
378         break;
379     default:
380         G_OBJECT_WARN_INVALID_PROPERTY_ID(object, propID, pspec);
381         break;
382     }
383 }
384
385 static void webKitWebSrcGetProperty(GObject* object, guint propID, GValue* value, GParamSpec* pspec)
386 {
387     WebKitWebSrc* src = WEBKIT_WEB_SRC(object);
388     WebKitWebSrcPrivate* priv = src->priv;
389
390     WTF::GMutexLocker<GMutex> locker(*GST_OBJECT_GET_LOCK(src));
391     switch (propID) {
392     case PROP_LOCATION:
393         g_value_set_string(value, priv->uri);
394         break;
395     case PROP_KEEP_ALIVE:
396         g_value_set_boolean(value, priv->keepAlive);
397         break;
398     case PROP_EXTRA_HEADERS:
399         gst_value_set_structure(value, priv->extraHeaders.get());
400         break;
401     case PROP_COMPRESS:
402         g_value_set_boolean(value, priv->compress);
403         break;
404     case PROP_METHOD:
405         g_value_set_string(value, priv->httpMethod.get());
406         break;
407     default:
408         G_OBJECT_WARN_INVALID_PROPERTY_ID(object, propID, pspec);
409         break;
410     }
411 }
412
413 static void webKitWebSrcStop(WebKitWebSrc* src)
414 {
415     WebKitWebSrcPrivate* priv = src->priv;
416
417     ASSERT(isMainThread());
418
419     WTF::GMutexLocker<GMutex> locker(*GST_OBJECT_GET_LOCK(src));
420
421     bool wasSeeking = std::exchange(priv->isSeeking, false);
422
423     priv->notifier.cancelPendingNotifications(MainThreadSourceNotification::NeedData | MainThreadSourceNotification::EnoughData | MainThreadSourceNotification::Seek);
424
425     if (priv->client) {
426         delete priv->client;
427         priv->client = 0;
428     }
429
430     if (!priv->keepAlive)
431         priv->loader = nullptr;
432
433     if (priv->buffer) {
434         unmapGstBuffer(priv->buffer.get());
435         priv->buffer.clear();
436     }
437
438     priv->paused = FALSE;
439
440     priv->offset = 0;
441     priv->seekable = FALSE;
442
443     if (!wasSeeking) {
444         priv->size = 0;
445         priv->requestedOffset = 0;
446         priv->player = 0;
447     }
448
449     locker.unlock();
450
451     if (priv->appsrc) {
452         gst_app_src_set_caps(priv->appsrc, 0);
453         if (!wasSeeking)
454             gst_app_src_set_size(priv->appsrc, -1);
455     }
456
457     GST_DEBUG_OBJECT(src, "Stopped request");
458 }
459
460 static bool webKitWebSrcSetExtraHeader(GQuark fieldId, const GValue* value, gpointer userData)
461 {
462     GUniquePtr<gchar> fieldContent;
463
464     if (G_VALUE_HOLDS_STRING(value))
465         fieldContent.reset(g_value_dup_string(value));
466     else {
467         GValue dest = G_VALUE_INIT;
468
469         g_value_init(&dest, G_TYPE_STRING);
470         if (g_value_transform(value, &dest))
471             fieldContent.reset(g_value_dup_string(&dest));
472     }
473
474     const gchar* fieldName = g_quark_to_string(fieldId);
475     if (!fieldContent.get()) {
476         GST_ERROR("extra-headers field '%s' contains no value or can't be converted to a string", fieldName);
477         return false;
478     }
479
480     GST_DEBUG("Appending extra header: \"%s: %s\"", fieldName, fieldContent.get());
481     ResourceRequest* request = static_cast<ResourceRequest*>(userData);
482     request->setHTTPHeaderField(fieldName, fieldContent.get());
483     return true;
484 }
485
486 static gboolean webKitWebSrcProcessExtraHeaders(GQuark fieldId, const GValue* value, gpointer userData)
487 {
488     if (G_VALUE_TYPE(value) == GST_TYPE_ARRAY) {
489         unsigned size = gst_value_array_get_size(value);
490
491         for (unsigned i = 0; i < size; i++) {
492             if (!webKitWebSrcSetExtraHeader(fieldId, gst_value_array_get_value(value, i), userData))
493                 return FALSE;
494         }
495         return TRUE;
496     }
497
498     if (G_VALUE_TYPE(value) == GST_TYPE_LIST) {
499         unsigned size = gst_value_list_get_size(value);
500
501         for (unsigned i = 0; i < size; i++) {
502             if (!webKitWebSrcSetExtraHeader(fieldId, gst_value_list_get_value(value, i), userData))
503                 return FALSE;
504         }
505         return TRUE;
506     }
507
508     return webKitWebSrcSetExtraHeader(fieldId, value, userData);
509 }
510
511 static void webKitWebSrcStart(WebKitWebSrc* src)
512 {
513     WebKitWebSrcPrivate* priv = src->priv;
514
515     ASSERT(isMainThread());
516
517     WTF::GMutexLocker<GMutex> locker(*GST_OBJECT_GET_LOCK(src));
518
519     priv->didPassAccessControlCheck = false;
520
521     if (!priv->uri) {
522         GST_ERROR_OBJECT(src, "No URI provided");
523         locker.unlock();
524         webKitWebSrcStop(src);
525         return;
526     }
527
528     ASSERT(!priv->client);
529
530     GST_DEBUG_OBJECT(src, "Fetching %s", priv->uri);
531     URL url = URL(URL(), priv->uri);
532
533     ResourceRequest request(url);
534     request.setAllowCookies(true);
535     request.setFirstPartyForCookies(url);
536
537     priv->size = 0;
538
539     if (priv->player)
540         request.setHTTPReferrer(priv->player->referrer());
541
542     if (priv->httpMethod.get())
543         request.setHTTPMethod(priv->httpMethod.get());
544
545 #if USE(SOUP)
546     // By default, HTTP Accept-Encoding is disabled here as we don't
547     // want the received response to be encoded in any way as we need
548     // to rely on the proper size of the returned data on
549     // didReceiveResponse.
550     // If Accept-Encoding is used, the server may send the data in encoded format and
551     // request.expectedContentLength() will have the "wrong" size (the size of the
552     // compressed data), even though the data received in didReceiveData is uncompressed.
553     // This is however useful to enable for adaptive streaming
554     // scenarios, when the demuxer needs to download playlists.
555     if (!priv->compress)
556         request.setAcceptEncoding(false);
557 #endif
558
559     // Let Apple web servers know we want to access their nice movie trailers.
560     if (!g_ascii_strcasecmp("movies.apple.com", url.host().utf8().data())
561         || !g_ascii_strcasecmp("trailers.apple.com", url.host().utf8().data()))
562         request.setHTTPUserAgent("Quicktime/7.6.6");
563
564     if (priv->requestedOffset) {
565         GUniquePtr<gchar> val(g_strdup_printf("bytes=%" G_GUINT64_FORMAT "-", priv->requestedOffset));
566         request.setHTTPHeaderField(HTTPHeaderName::Range, val.get());
567     }
568     priv->offset = priv->requestedOffset;
569
570     if (!priv->keepAlive) {
571         GST_DEBUG_OBJECT(src, "Persistent connection support disabled");
572         request.setHTTPHeaderField(HTTPHeaderName::Connection, "close");
573     }
574
575     if (priv->extraHeaders)
576         gst_structure_foreach(priv->extraHeaders.get(), webKitWebSrcProcessExtraHeaders, &request);
577
578     // We always request Icecast/Shoutcast metadata, just in case ...
579     request.setHTTPHeaderField(HTTPHeaderName::IcyMetadata, "1");
580
581     bool loadFailed = true;
582     if (priv->player && !priv->loader)
583         priv->loader = priv->player->createResourceLoader();
584
585     if (priv->loader) {
586         PlatformMediaResourceLoader::LoadOptions loadOptions = 0;
587         if (request.url().protocolIsBlob())
588             loadOptions |= PlatformMediaResourceLoader::LoadOption::BufferData;
589         priv->resource = priv->loader->requestResource(request, loadOptions);
590         loadFailed = !priv->resource;
591
592         if (priv->resource)
593             priv->resource->setClient(std::make_unique<CachedResourceStreamingClient>(src));
594     } else {
595         priv->client = new ResourceHandleStreamingClient(src, request);
596         loadFailed = priv->client->loadFailed();
597     }
598
599     if (loadFailed) {
600         GST_ERROR_OBJECT(src, "Failed to setup streaming client");
601         if (priv->client) {
602             delete priv->client;
603             priv->client = nullptr;
604         }
605         priv->loader = nullptr;
606         priv->resource = nullptr;
607         locker.unlock();
608         webKitWebSrcStop(src);
609         return;
610     }
611     GST_DEBUG_OBJECT(src, "Started request");
612 }
613
614 static GstStateChangeReturn webKitWebSrcChangeState(GstElement* element, GstStateChange transition)
615 {
616     GstStateChangeReturn ret = GST_STATE_CHANGE_SUCCESS;
617     WebKitWebSrc* src = WEBKIT_WEB_SRC(element);
618     WebKitWebSrcPrivate* priv = src->priv;
619
620     switch (transition) {
621     case GST_STATE_CHANGE_NULL_TO_READY:
622         if (!priv->appsrc) {
623             gst_element_post_message(element,
624                                      gst_missing_element_message_new(element, "appsrc"));
625             GST_ELEMENT_ERROR(src, CORE, MISSING_PLUGIN, (0), ("no appsrc"));
626             return GST_STATE_CHANGE_FAILURE;
627         }
628         break;
629     default:
630         break;
631     }
632
633     ret = GST_ELEMENT_CLASS(parent_class)->change_state(element, transition);
634     if (G_UNLIKELY(ret == GST_STATE_CHANGE_FAILURE)) {
635         GST_DEBUG_OBJECT(src, "State change failed");
636         return ret;
637     }
638
639     switch (transition) {
640     case GST_STATE_CHANGE_READY_TO_PAUSED:
641     {
642         GST_DEBUG_OBJECT(src, "READY->PAUSED");
643         GRefPtr<WebKitWebSrc> protector = WTF::ensureGRef(src);
644         priv->notifier.notify(MainThreadSourceNotification::Start, [protector] { webKitWebSrcStart(protector.get()); });
645         break;
646     }
647     case GST_STATE_CHANGE_PAUSED_TO_READY:
648     {
649         GST_DEBUG_OBJECT(src, "PAUSED->READY");
650         priv->notifier.cancelPendingNotifications();
651         GRefPtr<WebKitWebSrc> protector = WTF::ensureGRef(src);
652         priv->notifier.notify(MainThreadSourceNotification::Stop, [protector] { webKitWebSrcStop(protector.get()); });
653         break;
654     }
655     default:
656         break;
657     }
658
659     return ret;
660 }
661
662 static gboolean webKitWebSrcQueryWithParent(GstPad* pad, GstObject* parent, GstQuery* query)
663 {
664     WebKitWebSrc* src = WEBKIT_WEB_SRC(GST_ELEMENT(parent));
665     gboolean result = FALSE;
666
667     switch (GST_QUERY_TYPE(query)) {
668     case GST_QUERY_DURATION: {
669         GstFormat format;
670
671         gst_query_parse_duration(query, &format, NULL);
672
673         GST_DEBUG_OBJECT(src, "duration query in format %s", gst_format_get_name(format));
674         WTF::GMutexLocker<GMutex> locker(*GST_OBJECT_GET_LOCK(src));
675         if (format == GST_FORMAT_BYTES && src->priv->size > 0) {
676             gst_query_set_duration(query, format, src->priv->size);
677             result = TRUE;
678         }
679         break;
680     }
681     case GST_QUERY_URI: {
682         WTF::GMutexLocker<GMutex> locker(*GST_OBJECT_GET_LOCK(src));
683         gst_query_set_uri(query, src->priv->uri);
684         result = TRUE;
685         break;
686     }
687 #if GST_CHECK_VERSION(1, 2, 0)
688     case GST_QUERY_SCHEDULING: {
689         GstSchedulingFlags flags;
690         int minSize, maxSize, align;
691
692         gst_query_parse_scheduling(query, &flags, &minSize, &maxSize, &align);
693         gst_query_set_scheduling(query, static_cast<GstSchedulingFlags>(flags | GST_SCHEDULING_FLAG_BANDWIDTH_LIMITED), minSize, maxSize, align);
694         result = TRUE;
695         break;
696     }
697 #endif
698     default: {
699         GRefPtr<GstPad> target = adoptGRef(gst_ghost_pad_get_target(GST_GHOST_PAD_CAST(pad)));
700
701         // Forward the query to the proxy target pad.
702         if (target)
703             result = gst_pad_query(target.get(), query);
704         break;
705     }
706     }
707
708     return result;
709 }
710
711 static bool urlHasSupportedProtocol(const URL& url)
712 {
713     return url.isValid() && (url.protocolIsInHTTPFamily() || url.protocolIsBlob());
714 }
715
716 // uri handler interface
717
718 static GstURIType webKitWebSrcUriGetType(GType)
719 {
720     return GST_URI_SRC;
721 }
722
723 const gchar* const* webKitWebSrcGetProtocols(GType)
724 {
725     static const char* protocols[] = {"http", "https", "blob", 0 };
726     return protocols;
727 }
728
729 static gchar* webKitWebSrcGetUri(GstURIHandler* handler)
730 {
731     WebKitWebSrc* src = WEBKIT_WEB_SRC(handler);
732     gchar* ret;
733
734     WTF::GMutexLocker<GMutex> locker(*GST_OBJECT_GET_LOCK(src));
735     ret = g_strdup(src->priv->uri);
736     return ret;
737 }
738
739 static gboolean webKitWebSrcSetUri(GstURIHandler* handler, const gchar* uri, GError** error)
740 {
741     WebKitWebSrc* src = WEBKIT_WEB_SRC(handler);
742     WebKitWebSrcPrivate* priv = src->priv;
743
744     if (GST_STATE(src) >= GST_STATE_PAUSED) {
745         GST_ERROR_OBJECT(src, "URI can only be set in states < PAUSED");
746         return FALSE;
747     }
748
749     WTF::GMutexLocker<GMutex> locker(*GST_OBJECT_GET_LOCK(src));
750
751     g_free(priv->uri);
752     priv->uri = 0;
753
754     if (!uri)
755         return TRUE;
756
757     URL url(URL(), uri);
758     if (!urlHasSupportedProtocol(url)) {
759         g_set_error(error, GST_URI_ERROR, GST_URI_ERROR_BAD_URI, "Invalid URI '%s'", uri);
760         return FALSE;
761     }
762
763     priv->uri = g_strdup(url.string().utf8().data());
764     return TRUE;
765 }
766
767 static void webKitWebSrcUriHandlerInit(gpointer gIface, gpointer)
768 {
769     GstURIHandlerInterface* iface = (GstURIHandlerInterface *) gIface;
770
771     iface->get_type = webKitWebSrcUriGetType;
772     iface->get_protocols = webKitWebSrcGetProtocols;
773     iface->get_uri = webKitWebSrcGetUri;
774     iface->set_uri = webKitWebSrcSetUri;
775 }
776
777 // appsrc callbacks
778
779 static void webKitWebSrcNeedData(WebKitWebSrc* src)
780 {
781     WebKitWebSrcPrivate* priv = src->priv;
782
783     ASSERT(isMainThread());
784
785     GST_DEBUG_OBJECT(src, "Need more data");
786
787     {
788         WTF::GMutexLocker<GMutex> locker(*GST_OBJECT_GET_LOCK(src));
789         priv->paused = FALSE;
790     }
791
792     if (priv->client)
793         priv->client->setDefersLoading(false);
794     if (priv->resource)
795         priv->resource->setDefersLoading(false);
796 }
797
798 static void webKitWebSrcEnoughData(WebKitWebSrc* src)
799 {
800     WebKitWebSrcPrivate* priv = src->priv;
801
802     ASSERT(isMainThread());
803
804     GST_DEBUG_OBJECT(src, "Have enough data");
805
806     {
807         WTF::GMutexLocker<GMutex> locker(*GST_OBJECT_GET_LOCK(src));
808         priv->paused = TRUE;
809     }
810
811     if (priv->client)
812         priv->client->setDefersLoading(true);
813     if (priv->resource)
814         priv->resource->setDefersLoading(true);
815 }
816
817 static void webKitWebSrcSeek(WebKitWebSrc* src)
818 {
819     ASSERT(isMainThread());
820
821     GST_DEBUG_OBJECT(src, "Seeking to offset: %" G_GUINT64_FORMAT, src->priv->requestedOffset);
822
823     webKitWebSrcStop(src);
824     webKitWebSrcStart(src);
825 }
826
827 void webKitWebSrcSetMediaPlayer(WebKitWebSrc* src, WebCore::MediaPlayer* player)
828 {
829     ASSERT(player);
830     WTF::GMutexLocker<GMutex> locker(*GST_OBJECT_GET_LOCK(src));
831     src->priv->player = player;
832 }
833
834 bool webKitSrcPassedCORSAccessCheck(WebKitWebSrc* src)
835 {
836     return src->priv->didPassAccessControlCheck;
837 }
838
839 StreamingClient::StreamingClient(WebKitWebSrc* src)
840     : m_src(static_cast<GstElement*>(gst_object_ref(src)))
841 {
842 }
843
844 StreamingClient::~StreamingClient()
845 {
846     gst_object_unref(m_src);
847 }
848
849 char* StreamingClient::createReadBuffer(size_t requestedSize, size_t& actualSize)
850 {
851     WebKitWebSrc* src = WEBKIT_WEB_SRC(m_src);
852     WebKitWebSrcPrivate* priv = src->priv;
853
854     ASSERT(!priv->buffer);
855
856     GstBuffer* buffer = gst_buffer_new_and_alloc(requestedSize);
857
858     mapGstBuffer(buffer);
859
860     WTF::GMutexLocker<GMutex> locker(*GST_OBJECT_GET_LOCK(src));
861     priv->buffer = adoptGRef(buffer);
862     locker.unlock();
863
864     actualSize = gst_buffer_get_size(buffer);
865     return getGstBufferDataPointer(buffer);
866 }
867
868 void StreamingClient::handleResponseReceived(const ResourceResponse& response)
869 {
870     WebKitWebSrc* src = WEBKIT_WEB_SRC(m_src);
871     WebKitWebSrcPrivate* priv = src->priv;
872
873     GST_DEBUG_OBJECT(src, "Received response: %d", response.httpStatusCode());
874
875     if (response.httpStatusCode() >= 400) {
876         GST_ELEMENT_ERROR(src, RESOURCE, READ, ("Received %d HTTP error code", response.httpStatusCode()), (nullptr));
877         gst_app_src_end_of_stream(priv->appsrc);
878         webKitWebSrcStop(src);
879         return;
880     }
881
882     WTF::GMutexLocker<GMutex> locker(*GST_OBJECT_GET_LOCK(src));
883
884     if (priv->isSeeking) {
885         GST_DEBUG_OBJECT(src, "Seek in progress, ignoring response");
886         return;
887     }
888
889     if (priv->requestedOffset) {
890         // Seeking ... we expect a 206 == PARTIAL_CONTENT
891         if (response.httpStatusCode() == 200) {
892             // Range request didn't have a ranged response; resetting offset.
893             priv->offset = 0;
894         } else if (response.httpStatusCode() != 206) {
895             // Range request completely failed.
896             locker.unlock();
897             GST_ELEMENT_ERROR(src, RESOURCE, READ, ("Received unexpected %d HTTP status code", response.httpStatusCode()), (nullptr));
898             gst_app_src_end_of_stream(priv->appsrc);
899             webKitWebSrcStop(src);
900             return;
901         }
902     }
903
904     long long length = response.expectedContentLength();
905     if (length > 0 && priv->requestedOffset && response.httpStatusCode() == 206)
906         length += priv->requestedOffset;
907
908     priv->size = length >= 0 ? length : 0;
909     priv->seekable = length > 0 && g_ascii_strcasecmp("none", response.httpHeaderField(HTTPHeaderName::AcceptRanges).utf8().data());
910
911     locker.unlock();
912
913     // notify size/duration
914     if (length > 0) {
915         gst_app_src_set_size(priv->appsrc, length);
916     } else
917         gst_app_src_set_size(priv->appsrc, -1);
918
919     gst_app_src_set_caps(priv->appsrc, 0);
920 }
921
922 void StreamingClient::handleDataReceived(const char* data, int length)
923 {
924     WebKitWebSrc* src = WEBKIT_WEB_SRC(m_src);
925     WebKitWebSrcPrivate* priv = src->priv;
926
927     WTF::GMutexLocker<GMutex> locker(*GST_OBJECT_GET_LOCK(src));
928
929     GST_LOG_OBJECT(src, "Have %lld bytes of data", priv->buffer ? static_cast<long long>(gst_buffer_get_size(priv->buffer.get())) : length);
930
931     ASSERT(!priv->buffer || data == getGstBufferDataPointer(priv->buffer.get()));
932
933     if (priv->buffer)
934         unmapGstBuffer(priv->buffer.get());
935
936     if (priv->isSeeking) {
937         GST_DEBUG_OBJECT(src, "Seek in progress, ignoring data");
938         priv->buffer.clear();
939         return;
940     }
941
942     if (priv->offset < priv->requestedOffset) {
943         // Range request failed; seeking manually.
944         if (priv->offset + length <= priv->requestedOffset) {
945             // Discard all the buffers coming before the requested seek position.
946             priv->offset += length;
947             priv->buffer.clear();
948             return;
949         }
950
951         if (priv->offset + length > priv->requestedOffset) {
952             guint64 offset = priv->requestedOffset - priv->offset;
953             data += offset;
954             length -= offset;
955             if (priv->buffer)
956                 gst_buffer_resize(priv->buffer.get(), offset, -1);
957             priv->offset = priv->requestedOffset;
958         }
959
960         priv->requestedOffset = 0;
961     }
962
963     // Ports using the GStreamer backend but not the soup implementation of ResourceHandle
964     // won't be using buffers provided by this client, the buffer is created here in that case.
965     if (!priv->buffer)
966         priv->buffer = adoptGRef(createGstBufferForData(data, length));
967     else
968         gst_buffer_set_size(priv->buffer.get(), static_cast<gssize>(length));
969
970     GST_BUFFER_OFFSET(priv->buffer.get()) = priv->offset;
971     if (priv->requestedOffset == priv->offset)
972         priv->requestedOffset += length;
973     priv->offset += length;
974     // priv->size == 0 if received length on didReceiveResponse < 0.
975     if (priv->size > 0 && priv->offset > priv->size) {
976         GST_DEBUG_OBJECT(src, "Updating internal size from %" G_GUINT64_FORMAT " to %" G_GUINT64_FORMAT, priv->size, priv->offset);
977         gst_app_src_set_size(priv->appsrc, priv->offset);
978         priv->size = priv->offset;
979     }
980     GST_BUFFER_OFFSET_END(priv->buffer.get()) = priv->offset;
981
982     locker.unlock();
983
984     GstFlowReturn ret = gst_app_src_push_buffer(priv->appsrc, priv->buffer.leakRef());
985     if (ret != GST_FLOW_OK && ret != GST_FLOW_EOS)
986         GST_ELEMENT_ERROR(src, CORE, FAILED, (0), (0));
987 }
988
989 void StreamingClient::handleNotifyFinished()
990 {
991     WebKitWebSrc* src = WEBKIT_WEB_SRC(m_src);
992     WebKitWebSrcPrivate* priv = src->priv;
993
994     GST_DEBUG_OBJECT(src, "Have EOS");
995
996     WTF::GMutexLocker<GMutex> locker(*GST_OBJECT_GET_LOCK(src));
997     if (!priv->isSeeking) {
998         locker.unlock();
999         gst_app_src_end_of_stream(priv->appsrc);
1000     }
1001 }
1002
1003 CachedResourceStreamingClient::CachedResourceStreamingClient(WebKitWebSrc* src)
1004     : StreamingClient(src)
1005 {
1006 }
1007
1008 CachedResourceStreamingClient::~CachedResourceStreamingClient()
1009 {
1010 }
1011
1012 #if USE(SOUP)
1013 char* CachedResourceStreamingClient::getOrCreateReadBuffer(PlatformMediaResource&, size_t requestedSize, size_t& actualSize)
1014 {
1015     return createReadBuffer(requestedSize, actualSize);
1016 }
1017 #endif
1018
1019 void CachedResourceStreamingClient::responseReceived(PlatformMediaResource&, const ResourceResponse& response)
1020 {
1021     WebKitWebSrcPrivate* priv = WEBKIT_WEB_SRC(m_src)->priv;
1022     priv->didPassAccessControlCheck = priv->resource->didPassAccessControlCheck();
1023     handleResponseReceived(response);
1024 }
1025
1026 void CachedResourceStreamingClient::dataReceived(PlatformMediaResource&, const char* data, int length)
1027 {
1028     handleDataReceived(data, length);
1029 }
1030
1031 void CachedResourceStreamingClient::accessControlCheckFailed(PlatformMediaResource&, const ResourceError& error)
1032 {
1033     WebKitWebSrc* src = WEBKIT_WEB_SRC(m_src);
1034     GST_ELEMENT_ERROR(src, RESOURCE, READ, ("%s", error.localizedDescription().utf8().data()), (nullptr));
1035     gst_app_src_end_of_stream(src->priv->appsrc);
1036     webKitWebSrcStop(src);
1037 }
1038
1039 void CachedResourceStreamingClient::loadFailed(PlatformMediaResource&, const ResourceError& error)
1040 {
1041     WebKitWebSrc* src = WEBKIT_WEB_SRC(m_src);
1042
1043     if (!error.isCancellation()) {
1044         GST_ERROR_OBJECT(src, "Have failure: %s", error.localizedDescription().utf8().data());
1045         GST_ELEMENT_ERROR(src, RESOURCE, FAILED, ("%s", error.localizedDescription().utf8().data()), (nullptr));
1046     }
1047
1048     gst_app_src_end_of_stream(src->priv->appsrc);
1049 }
1050
1051 void CachedResourceStreamingClient::loadFinished(PlatformMediaResource&)
1052 {
1053     handleNotifyFinished();
1054 }
1055
1056 ResourceHandleStreamingClient::ResourceHandleStreamingClient(WebKitWebSrc* src, const ResourceRequest& request)
1057     : StreamingClient(src)
1058 {
1059     m_resource = ResourceHandle::create(0 /*context*/, request, this, false, false);
1060 }
1061
1062 ResourceHandleStreamingClient::~ResourceHandleStreamingClient()
1063 {
1064     if (m_resource) {
1065         m_resource->cancel();
1066         m_resource = nullptr;
1067     }
1068 }
1069
1070 bool ResourceHandleStreamingClient::loadFailed() const
1071 {
1072     return !m_resource;
1073 }
1074
1075 void ResourceHandleStreamingClient::setDefersLoading(bool defers)
1076 {
1077     if (m_resource)
1078         m_resource->setDefersLoading(defers);
1079 }
1080
1081 #if USE(SOUP)
1082 char* ResourceHandleStreamingClient::getOrCreateReadBuffer(size_t requestedSize, size_t& actualSize)
1083 {
1084     return createReadBuffer(requestedSize, actualSize);
1085 }
1086 #endif
1087
1088 void ResourceHandleStreamingClient::willSendRequest(ResourceHandle*, ResourceRequest&, const ResourceResponse&)
1089 {
1090 }
1091
1092 void ResourceHandleStreamingClient::didReceiveResponse(ResourceHandle*, const ResourceResponse& response)
1093 {
1094     handleResponseReceived(response);
1095 }
1096
1097 void ResourceHandleStreamingClient::didReceiveData(ResourceHandle*, const char* /* data */, unsigned /* length */, int)
1098 {
1099     ASSERT_NOT_REACHED();
1100 }
1101
1102 void ResourceHandleStreamingClient::didReceiveBuffer(ResourceHandle*, PassRefPtr<SharedBuffer> buffer, int /* encodedLength */)
1103 {
1104     // This pattern is suggested by SharedBuffer.h.
1105     const char* segment;
1106     unsigned position = 0;
1107     while (unsigned length = buffer->getSomeData(segment, position)) {
1108         handleDataReceived(segment, length);
1109         position += length;
1110     }
1111 }
1112
1113 void ResourceHandleStreamingClient::didFinishLoading(ResourceHandle*, double)
1114 {
1115     handleNotifyFinished();
1116 }
1117
1118 void ResourceHandleStreamingClient::didFail(ResourceHandle*, const ResourceError& error)
1119 {
1120     WebKitWebSrc* src = WEBKIT_WEB_SRC(m_src);
1121
1122     GST_ERROR_OBJECT(src, "Have failure: %s", error.localizedDescription().utf8().data());
1123     GST_ELEMENT_ERROR(src, RESOURCE, FAILED, ("%s", error.localizedDescription().utf8().data()), (0));
1124     gst_app_src_end_of_stream(src->priv->appsrc);
1125 }
1126
1127 void ResourceHandleStreamingClient::wasBlocked(ResourceHandle*)
1128 {
1129     WebKitWebSrc* src = WEBKIT_WEB_SRC(m_src);
1130     GUniquePtr<gchar> uri;
1131
1132     GST_ERROR_OBJECT(src, "Request was blocked");
1133
1134     WTF::GMutexLocker<GMutex> locker(*GST_OBJECT_GET_LOCK(src));
1135     uri.reset(g_strdup(src->priv->uri));
1136     locker.unlock();
1137
1138     GST_ELEMENT_ERROR(src, RESOURCE, OPEN_READ, ("Access to \"%s\" was blocked", uri.get()), (0));
1139 }
1140
1141 void ResourceHandleStreamingClient::cannotShowURL(ResourceHandle*)
1142 {
1143     WebKitWebSrc* src = WEBKIT_WEB_SRC(m_src);
1144     GUniquePtr<gchar> uri;
1145
1146     GST_ERROR_OBJECT(src, "Cannot show URL");
1147
1148     WTF::GMutexLocker<GMutex> locker(*GST_OBJECT_GET_LOCK(src));
1149     uri.reset(g_strdup(src->priv->uri));
1150     locker.unlock();
1151
1152     GST_ELEMENT_ERROR(src, RESOURCE, OPEN_READ, ("Can't show \"%s\"", uri.get()), (0));
1153 }
1154
1155 #endif // USE(GSTREAMER)
1156