[GStreamer] Use MainThreadNotifier to send notifications to main thread in WebKitWebS...
[WebKit-https.git] / Source / WebCore / platform / graphics / gstreamer / WebKitWebSourceGStreamer.cpp
1 /*
2  *  Copyright (C) 2009, 2010 Sebastian Dröge <sebastian.droege@collabora.co.uk>
3  *  Copyright (C) 2013 Collabora Ltd.
4  *
5  *  This library is free software; you can redistribute it and/or
6  *  modify it under the terms of the GNU Lesser General Public
7  *  License as published by the Free Software Foundation; either
8  *  version 2 of the License, or (at your option) any later version.
9  *
10  *  This library is distributed in the hope that it will be useful,
11  *  but WITHOUT ANY WARRANTY; without even the implied warranty of
12  *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  *  Lesser General Public License for more details.
14  *
15  *  You should have received a copy of the GNU Lesser General Public
16  *  License along with this library; if not, write to the Free Software
17  *  Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301  USA
18  */
19
20 #include "config.h"
21 #include "WebKitWebSourceGStreamer.h"
22
23 #if ENABLE(VIDEO) && USE(GSTREAMER)
24
25 #include "GRefPtrGStreamer.h"
26 #include "GStreamerUtilities.h"
27 #include "GUniquePtrGStreamer.h"
28 #include "HTTPHeaderNames.h"
29 #include "MainThreadNotifier.h"
30 #include "MediaPlayer.h"
31 #include "NotImplemented.h"
32 #include "PlatformMediaResourceLoader.h"
33 #include "ResourceError.h"
34 #include "ResourceHandle.h"
35 #include "ResourceHandleClient.h"
36 #include "ResourceRequest.h"
37 #include "ResourceResponse.h"
38 #include "SharedBuffer.h"
39 #include <gst/app/gstappsrc.h>
40 #include <gst/gst.h>
41 #include <gst/pbutils/missing-plugins.h>
42 #include <wtf/MainThread.h>
43 #include <wtf/Noncopyable.h>
44 #include <wtf/glib/GMutexLocker.h>
45 #include <wtf/glib/GRefPtr.h>
46 #include <wtf/glib/GUniquePtr.h>
47 #include <wtf/text/CString.h>
48
49 using namespace WebCore;
50
51 class StreamingClient {
52     public:
53         StreamingClient(WebKitWebSrc*);
54         virtual ~StreamingClient();
55
56     protected:
57         char* createReadBuffer(size_t requestedSize, size_t& actualSize);
58         void handleResponseReceived(const ResourceResponse&);
59         void handleDataReceived(const char*, int);
60         void handleNotifyFinished();
61
62         GstElement* m_src;
63 };
64
65 class CachedResourceStreamingClient final : public PlatformMediaResourceLoaderClient, public StreamingClient {
66     WTF_MAKE_NONCOPYABLE(CachedResourceStreamingClient);
67     public:
68         CachedResourceStreamingClient(WebKitWebSrc*);
69         virtual ~CachedResourceStreamingClient();
70
71     private:
72         // PlatformMediaResourceLoaderClient virtual methods.
73 #if USE(SOUP)
74         virtual char* getOrCreateReadBuffer(size_t requestedSize, size_t& actualSize) override;
75 #endif
76         virtual void responseReceived(const ResourceResponse&) override;
77         virtual void dataReceived(const char*, int) override;
78         virtual void accessControlCheckFailed(const ResourceError&) override;
79         virtual void loadFailed(const ResourceError&) override;
80         virtual void loadFinished() override;
81 };
82
83 class ResourceHandleStreamingClient : public ResourceHandleClient, public StreamingClient {
84     WTF_MAKE_NONCOPYABLE(ResourceHandleStreamingClient); WTF_MAKE_FAST_ALLOCATED;
85     public:
86         ResourceHandleStreamingClient(WebKitWebSrc*, const ResourceRequest&);
87         virtual ~ResourceHandleStreamingClient();
88
89         // StreamingClient virtual methods.
90         bool loadFailed() const;
91         void setDefersLoading(bool);
92
93     private:
94         // ResourceHandleClient virtual methods.
95         virtual char* getOrCreateReadBuffer(size_t requestedSize, size_t& actualSize);
96         virtual void willSendRequest(ResourceHandle*, ResourceRequest&, const ResourceResponse&);
97         virtual void didReceiveResponse(ResourceHandle*, const ResourceResponse&);
98         virtual void didReceiveData(ResourceHandle*, const char*, unsigned, int);
99         virtual void didReceiveBuffer(ResourceHandle*, PassRefPtr<SharedBuffer>, int encodedLength);
100         virtual void didFinishLoading(ResourceHandle*, double /*finishTime*/);
101         virtual void didFail(ResourceHandle*, const ResourceError&);
102         virtual void wasBlocked(ResourceHandle*);
103         virtual void cannotShowURL(ResourceHandle*);
104
105         RefPtr<ResourceHandle> m_resource;
106 };
107
108 enum MainThreadSourceNotification {
109     Start = 1 << 0,
110     Stop = 1 << 1,
111     NeedData = 1 << 2,
112     EnoughData = 1 << 3,
113     Seek = 1 << 4
114 };
115
116 #define WEBKIT_WEB_SRC_GET_PRIVATE(obj) (G_TYPE_INSTANCE_GET_PRIVATE((obj), WEBKIT_TYPE_WEB_SRC, WebKitWebSrcPrivate))
117 struct _WebKitWebSrcPrivate {
118     GstAppSrc* appsrc;
119     GstPad* srcpad;
120     gchar* uri;
121     bool keepAlive;
122     GUniquePtr<GstStructure> extraHeaders;
123     bool compress;
124     GUniquePtr<gchar> httpMethod;
125
126     WebCore::MediaPlayer* player;
127
128     RefPtr<PlatformMediaResourceLoader> loader;
129     ResourceHandleStreamingClient* client;
130
131     bool didPassAccessControlCheck;
132
133     guint64 offset;
134     guint64 size;
135     gboolean seekable;
136     gboolean paused;
137     bool isSeeking;
138
139     guint64 requestedOffset;
140
141     MainThreadNotifier<MainThreadSourceNotification> notifier;
142     GRefPtr<GstBuffer> buffer;
143 };
144
145 enum {
146     PROP_0,
147     PROP_LOCATION,
148     PROP_KEEP_ALIVE,
149     PROP_EXTRA_HEADERS,
150     PROP_COMPRESS,
151     PROP_METHOD
152 };
153
154 static GstStaticPadTemplate srcTemplate = GST_STATIC_PAD_TEMPLATE("src",
155                                                                   GST_PAD_SRC,
156                                                                   GST_PAD_ALWAYS,
157                                                                   GST_STATIC_CAPS_ANY);
158
159 GST_DEBUG_CATEGORY_STATIC(webkit_web_src_debug);
160 #define GST_CAT_DEFAULT webkit_web_src_debug
161
162 static void webKitWebSrcUriHandlerInit(gpointer gIface, gpointer ifaceData);
163
164 static void webKitWebSrcDispose(GObject*);
165 static void webKitWebSrcFinalize(GObject*);
166 static void webKitWebSrcSetProperty(GObject*, guint propertyID, const GValue*, GParamSpec*);
167 static void webKitWebSrcGetProperty(GObject*, guint propertyID, GValue*, GParamSpec*);
168 static GstStateChangeReturn webKitWebSrcChangeState(GstElement*, GstStateChange);
169
170 static gboolean webKitWebSrcQueryWithParent(GstPad*, GstObject*, GstQuery*);
171
172 static void webKitWebSrcNeedData(WebKitWebSrc*);
173 static void webKitWebSrcEnoughData(WebKitWebSrc*);
174 static void webKitWebSrcSeek(WebKitWebSrc*);
175
176 static GstAppSrcCallbacks appsrcCallbacks = {
177     // need_data
178     [](GstAppSrc*, guint, gpointer userData) {
179         WebKitWebSrc* src = WEBKIT_WEB_SRC(userData);
180         WebKitWebSrcPrivate* priv = src->priv;
181
182         {
183             WTF::GMutexLocker<GMutex> locker(*GST_OBJECT_GET_LOCK(src));
184             if (!priv->paused)
185                 return;
186         }
187
188         GRefPtr<WebKitWebSrc> protector(src);
189         priv->notifier.notify(MainThreadSourceNotification::NeedData, [protector] { webKitWebSrcNeedData(protector.get()); });
190     },
191     // enough_data
192     [](GstAppSrc*, gpointer userData) {
193         WebKitWebSrc* src = WEBKIT_WEB_SRC(userData);
194         WebKitWebSrcPrivate* priv = src->priv;
195
196         {
197             WTF::GMutexLocker<GMutex> locker(*GST_OBJECT_GET_LOCK(src));
198             if (priv->paused)
199                 return;
200         }
201
202         GRefPtr<WebKitWebSrc> protector(src);
203         priv->notifier.notify(MainThreadSourceNotification::EnoughData, [protector] { webKitWebSrcEnoughData(protector.get()); });
204     },
205     // seek_data
206     [](GstAppSrc*, guint64 offset, gpointer userData) -> gboolean {
207         WebKitWebSrc* src = WEBKIT_WEB_SRC(userData);
208         WebKitWebSrcPrivate* priv = src->priv;
209
210         {
211             WTF::GMutexLocker<GMutex> locker(*GST_OBJECT_GET_LOCK(src));
212             if (offset == priv->offset && priv->requestedOffset == priv->offset)
213                 return TRUE;
214
215             if (!priv->seekable)
216                 return FALSE;
217
218             priv->isSeeking = true;
219             priv->requestedOffset = offset;
220         }
221
222         GRefPtr<WebKitWebSrc> protector(src);
223         priv->notifier.notify(MainThreadSourceNotification::Seek, [protector] { webKitWebSrcSeek(protector.get()); });
224         return TRUE;
225     },
226     { nullptr }
227 };
228
229 #define webkit_web_src_parent_class parent_class
230 // We split this out into another macro to avoid a check-webkit-style error.
231 #define WEBKIT_WEB_SRC_CATEGORY_INIT GST_DEBUG_CATEGORY_INIT(webkit_web_src_debug, "webkitwebsrc", 0, "websrc element");
232 G_DEFINE_TYPE_WITH_CODE(WebKitWebSrc, webkit_web_src, GST_TYPE_BIN,
233                          G_IMPLEMENT_INTERFACE(GST_TYPE_URI_HANDLER, webKitWebSrcUriHandlerInit);
234                          WEBKIT_WEB_SRC_CATEGORY_INIT);
235
236 static void webkit_web_src_class_init(WebKitWebSrcClass* klass)
237 {
238     GObjectClass* oklass = G_OBJECT_CLASS(klass);
239     GstElementClass* eklass = GST_ELEMENT_CLASS(klass);
240
241     oklass->dispose = webKitWebSrcDispose;
242     oklass->finalize = webKitWebSrcFinalize;
243     oklass->set_property = webKitWebSrcSetProperty;
244     oklass->get_property = webKitWebSrcGetProperty;
245
246     gst_element_class_add_pad_template(eklass,
247                                        gst_static_pad_template_get(&srcTemplate));
248     gst_element_class_set_metadata(eklass, "WebKit Web source element", "Source", "Handles HTTP/HTTPS uris",
249                                "Sebastian Dröge <sebastian.droege@collabora.co.uk>");
250
251     /* Allows setting the uri using the 'location' property, which is used
252      * for example by gst_element_make_from_uri() */
253     g_object_class_install_property(oklass,
254                                     PROP_LOCATION,
255                                     g_param_spec_string("location",
256                                                         "location",
257                                                         "Location to read from",
258                                                         0,
259                                                         (GParamFlags) (G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)));
260
261     g_object_class_install_property(oklass, PROP_KEEP_ALIVE,
262         g_param_spec_boolean("keep-alive", "keep-alive", "Use HTTP persistent connections",
263             FALSE, static_cast<GParamFlags>(G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)));
264
265     g_object_class_install_property(oklass, PROP_EXTRA_HEADERS,
266         g_param_spec_boxed("extra-headers", "Extra Headers", "Extra headers to append to the HTTP request",
267             GST_TYPE_STRUCTURE, static_cast<GParamFlags>(G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)));
268
269     g_object_class_install_property(oklass, PROP_COMPRESS,
270         g_param_spec_boolean("compress", "Compress", "Allow compressed content encodings",
271             FALSE, static_cast<GParamFlags>(G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)));
272
273     g_object_class_install_property(oklass, PROP_METHOD,
274         g_param_spec_string("method", "method", "The HTTP method to use (default: GET)",
275             nullptr, static_cast<GParamFlags>(G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)));
276
277     eklass->change_state = webKitWebSrcChangeState;
278
279     g_type_class_add_private(klass, sizeof(WebKitWebSrcPrivate));
280 }
281
282 static void webkit_web_src_init(WebKitWebSrc* src)
283 {
284     WebKitWebSrcPrivate* priv = WEBKIT_WEB_SRC_GET_PRIVATE(src);
285
286     src->priv = priv;
287     new (priv) WebKitWebSrcPrivate();
288
289     priv->appsrc = GST_APP_SRC(gst_element_factory_make("appsrc", 0));
290     if (!priv->appsrc) {
291         GST_ERROR_OBJECT(src, "Failed to create appsrc");
292         return;
293     }
294
295     gst_bin_add(GST_BIN(src), GST_ELEMENT(priv->appsrc));
296
297
298     GRefPtr<GstPad> targetPad = adoptGRef(gst_element_get_static_pad(GST_ELEMENT(priv->appsrc), "src"));
299     priv->srcpad = webkitGstGhostPadFromStaticTemplate(&srcTemplate, "src", targetPad.get());
300
301     gst_element_add_pad(GST_ELEMENT(src), priv->srcpad);
302
303     GST_OBJECT_FLAG_SET(priv->srcpad, GST_PAD_FLAG_NEED_PARENT);
304     gst_pad_set_query_function(priv->srcpad, webKitWebSrcQueryWithParent);
305
306     gst_app_src_set_callbacks(priv->appsrc, &appsrcCallbacks, src, 0);
307     gst_app_src_set_emit_signals(priv->appsrc, FALSE);
308     gst_app_src_set_stream_type(priv->appsrc, GST_APP_STREAM_TYPE_SEEKABLE);
309
310     // 512k is a abitrary number but we should choose a value
311     // here to not pause/unpause the SoupMessage too often and
312     // to make sure there's always some data available for
313     // GStreamer to handle.
314     gst_app_src_set_max_bytes(priv->appsrc, 512 * 1024);
315
316     // Emit the need-data signal if the queue contains less
317     // than 20% of data. Without this the need-data signal
318     // is emitted when the queue is empty, we then dispatch
319     // the soup message unpausing to the main loop and from
320     // there unpause the soup message. This already takes
321     // quite some time and libsoup even needs some more time
322     // to actually provide data again. If we do all this
323     // already if the queue is 20% empty, it's much more
324     // likely that libsoup already provides new data before
325     // the queue is really empty.
326     // This might need tweaking for ports not using libsoup.
327     g_object_set(priv->appsrc, "min-percent", 20, NULL);
328
329     gst_app_src_set_caps(priv->appsrc, 0);
330     gst_app_src_set_size(priv->appsrc, -1);
331 }
332
333 static void webKitWebSrcDispose(GObject* object)
334 {
335     WebKitWebSrc* src = WEBKIT_WEB_SRC(object);
336     WebKitWebSrcPrivate* priv = src->priv;
337
338     priv->player = 0;
339
340     GST_CALL_PARENT(G_OBJECT_CLASS, dispose, (object));
341 }
342
343 static void webKitWebSrcFinalize(GObject* object)
344 {
345     WebKitWebSrc* src = WEBKIT_WEB_SRC(object);
346     WebKitWebSrcPrivate* priv = src->priv;
347
348     g_free(priv->uri);
349     priv->~WebKitWebSrcPrivate();
350
351     GST_CALL_PARENT(G_OBJECT_CLASS, finalize, (object));
352 }
353
354 static void webKitWebSrcSetProperty(GObject* object, guint propID, const GValue* value, GParamSpec* pspec)
355 {
356     WebKitWebSrc* src = WEBKIT_WEB_SRC(object);
357
358     switch (propID) {
359     case PROP_LOCATION:
360         gst_uri_handler_set_uri(reinterpret_cast<GstURIHandler*>(src), g_value_get_string(value), 0);
361         break;
362     case PROP_KEEP_ALIVE:
363         src->priv->keepAlive = g_value_get_boolean(value);
364         break;
365     case PROP_EXTRA_HEADERS: {
366         const GstStructure* s = gst_value_get_structure(value);
367         src->priv->extraHeaders.reset(s ? gst_structure_copy(s) : nullptr);
368         break;
369     }
370     case PROP_COMPRESS:
371         src->priv->compress = g_value_get_boolean(value);
372         break;
373     case PROP_METHOD:
374         src->priv->httpMethod.reset(g_value_dup_string(value));
375         break;
376     default:
377         G_OBJECT_WARN_INVALID_PROPERTY_ID(object, propID, pspec);
378         break;
379     }
380 }
381
382 static void webKitWebSrcGetProperty(GObject* object, guint propID, GValue* value, GParamSpec* pspec)
383 {
384     WebKitWebSrc* src = WEBKIT_WEB_SRC(object);
385     WebKitWebSrcPrivate* priv = src->priv;
386
387     WTF::GMutexLocker<GMutex> locker(*GST_OBJECT_GET_LOCK(src));
388     switch (propID) {
389     case PROP_LOCATION:
390         g_value_set_string(value, priv->uri);
391         break;
392     case PROP_KEEP_ALIVE:
393         g_value_set_boolean(value, priv->keepAlive);
394         break;
395     case PROP_EXTRA_HEADERS:
396         gst_value_set_structure(value, priv->extraHeaders.get());
397         break;
398     case PROP_COMPRESS:
399         g_value_set_boolean(value, priv->compress);
400         break;
401     case PROP_METHOD:
402         g_value_set_string(value, priv->httpMethod.get());
403         break;
404     default:
405         G_OBJECT_WARN_INVALID_PROPERTY_ID(object, propID, pspec);
406         break;
407     }
408 }
409
410 static void webKitWebSrcStop(WebKitWebSrc* src)
411 {
412     WebKitWebSrcPrivate* priv = src->priv;
413
414     ASSERT(isMainThread());
415
416     WTF::GMutexLocker<GMutex> locker(*GST_OBJECT_GET_LOCK(src));
417
418     bool wasSeeking = std::exchange(priv->isSeeking, false);
419
420     priv->notifier.cancelPendingNotifications(MainThreadSourceNotification::NeedData | MainThreadSourceNotification::EnoughData | MainThreadSourceNotification::Seek);
421
422     if (priv->client) {
423         delete priv->client;
424         priv->client = 0;
425     }
426
427     if (!priv->keepAlive)
428         priv->loader = nullptr;
429
430     if (priv->buffer) {
431         unmapGstBuffer(priv->buffer.get());
432         priv->buffer.clear();
433     }
434
435     priv->paused = FALSE;
436
437     priv->offset = 0;
438     priv->seekable = FALSE;
439
440     if (!wasSeeking) {
441         priv->size = 0;
442         priv->requestedOffset = 0;
443         priv->player = 0;
444     }
445
446     locker.unlock();
447
448     if (priv->appsrc) {
449         gst_app_src_set_caps(priv->appsrc, 0);
450         if (!wasSeeking)
451             gst_app_src_set_size(priv->appsrc, -1);
452     }
453
454     GST_DEBUG_OBJECT(src, "Stopped request");
455 }
456
457 static bool webKitWebSrcSetExtraHeader(GQuark fieldId, const GValue* value, gpointer userData)
458 {
459     GUniquePtr<gchar> fieldContent;
460
461     if (G_VALUE_HOLDS_STRING(value))
462         fieldContent.reset(g_value_dup_string(value));
463     else {
464         GValue dest = G_VALUE_INIT;
465
466         g_value_init(&dest, G_TYPE_STRING);
467         if (g_value_transform(value, &dest))
468             fieldContent.reset(g_value_dup_string(&dest));
469     }
470
471     const gchar* fieldName = g_quark_to_string(fieldId);
472     if (!fieldContent.get()) {
473         GST_ERROR("extra-headers field '%s' contains no value or can't be converted to a string", fieldName);
474         return false;
475     }
476
477     GST_DEBUG("Appending extra header: \"%s: %s\"", fieldName, fieldContent.get());
478     ResourceRequest* request = static_cast<ResourceRequest*>(userData);
479     request->setHTTPHeaderField(fieldName, fieldContent.get());
480     return true;
481 }
482
483 static gboolean webKitWebSrcProcessExtraHeaders(GQuark fieldId, const GValue* value, gpointer userData)
484 {
485     if (G_VALUE_TYPE(value) == GST_TYPE_ARRAY) {
486         unsigned size = gst_value_array_get_size(value);
487
488         for (unsigned i = 0; i < size; i++) {
489             if (!webKitWebSrcSetExtraHeader(fieldId, gst_value_array_get_value(value, i), userData))
490                 return FALSE;
491         }
492         return TRUE;
493     }
494
495     if (G_VALUE_TYPE(value) == GST_TYPE_LIST) {
496         unsigned size = gst_value_list_get_size(value);
497
498         for (unsigned i = 0; i < size; i++) {
499             if (!webKitWebSrcSetExtraHeader(fieldId, gst_value_list_get_value(value, i), userData))
500                 return FALSE;
501         }
502         return TRUE;
503     }
504
505     return webKitWebSrcSetExtraHeader(fieldId, value, userData);
506 }
507
508 static void webKitWebSrcStart(WebKitWebSrc* src)
509 {
510     WebKitWebSrcPrivate* priv = src->priv;
511
512     ASSERT(isMainThread());
513
514     WTF::GMutexLocker<GMutex> locker(*GST_OBJECT_GET_LOCK(src));
515
516     priv->didPassAccessControlCheck = false;
517
518     if (!priv->uri) {
519         GST_ERROR_OBJECT(src, "No URI provided");
520         locker.unlock();
521         webKitWebSrcStop(src);
522         return;
523     }
524
525     ASSERT(!priv->client);
526
527     GST_DEBUG_OBJECT(src, "Fetching %s", priv->uri);
528     URL url = URL(URL(), priv->uri);
529
530     ResourceRequest request(url);
531     request.setAllowCookies(true);
532     request.setFirstPartyForCookies(url);
533
534     priv->size = 0;
535
536     if (priv->player)
537         request.setHTTPReferrer(priv->player->referrer());
538
539     if (priv->httpMethod.get())
540         request.setHTTPMethod(priv->httpMethod.get());
541
542 #if USE(SOUP)
543     // By default, HTTP Accept-Encoding is disabled here as we don't
544     // want the received response to be encoded in any way as we need
545     // to rely on the proper size of the returned data on
546     // didReceiveResponse.
547     // If Accept-Encoding is used, the server may send the data in encoded format and
548     // request.expectedContentLength() will have the "wrong" size (the size of the
549     // compressed data), even though the data received in didReceiveData is uncompressed.
550     // This is however useful to enable for adaptive streaming
551     // scenarios, when the demuxer needs to download playlists.
552     if (!priv->compress)
553         request.setAcceptEncoding(false);
554 #endif
555
556     // Let Apple web servers know we want to access their nice movie trailers.
557     if (!g_ascii_strcasecmp("movies.apple.com", url.host().utf8().data())
558         || !g_ascii_strcasecmp("trailers.apple.com", url.host().utf8().data()))
559         request.setHTTPUserAgent("Quicktime/7.6.6");
560
561     if (priv->requestedOffset) {
562         GUniquePtr<gchar> val(g_strdup_printf("bytes=%" G_GUINT64_FORMAT "-", priv->requestedOffset));
563         request.setHTTPHeaderField(HTTPHeaderName::Range, val.get());
564     }
565     priv->offset = priv->requestedOffset;
566
567     if (!priv->keepAlive) {
568         GST_DEBUG_OBJECT(src, "Persistent connection support disabled");
569         request.setHTTPHeaderField(HTTPHeaderName::Connection, "close");
570     }
571
572     if (priv->extraHeaders)
573         gst_structure_foreach(priv->extraHeaders.get(), webKitWebSrcProcessExtraHeaders, &request);
574
575     // We always request Icecast/Shoutcast metadata, just in case ...
576     request.setHTTPHeaderField(HTTPHeaderName::IcyMetadata, "1");
577
578     bool loadFailed = true;
579     if (priv->player && !priv->loader)
580         priv->loader = priv->player->createResourceLoader(std::make_unique<CachedResourceStreamingClient>(src));
581
582     if (priv->loader) {
583         PlatformMediaResourceLoader::LoadOptions loadOptions = 0;
584         if (request.url().protocolIs("blob"))
585             loadOptions |= PlatformMediaResourceLoader::LoadOption::BufferData;
586         loadFailed = !priv->loader->start(request, loadOptions);
587     } else {
588         priv->client = new ResourceHandleStreamingClient(src, request);
589         loadFailed = priv->client->loadFailed();
590     }
591
592     if (loadFailed) {
593         GST_ERROR_OBJECT(src, "Failed to setup streaming client");
594         if (priv->client) {
595             delete priv->client;
596             priv->client = nullptr;
597         }
598         priv->loader = nullptr;
599         locker.unlock();
600         webKitWebSrcStop(src);
601         return;
602     }
603     GST_DEBUG_OBJECT(src, "Started request");
604 }
605
606 static GstStateChangeReturn webKitWebSrcChangeState(GstElement* element, GstStateChange transition)
607 {
608     GstStateChangeReturn ret = GST_STATE_CHANGE_SUCCESS;
609     WebKitWebSrc* src = WEBKIT_WEB_SRC(element);
610     WebKitWebSrcPrivate* priv = src->priv;
611
612     switch (transition) {
613     case GST_STATE_CHANGE_NULL_TO_READY:
614         if (!priv->appsrc) {
615             gst_element_post_message(element,
616                                      gst_missing_element_message_new(element, "appsrc"));
617             GST_ELEMENT_ERROR(src, CORE, MISSING_PLUGIN, (0), ("no appsrc"));
618             return GST_STATE_CHANGE_FAILURE;
619         }
620         break;
621     default:
622         break;
623     }
624
625     ret = GST_ELEMENT_CLASS(parent_class)->change_state(element, transition);
626     if (G_UNLIKELY(ret == GST_STATE_CHANGE_FAILURE)) {
627         GST_DEBUG_OBJECT(src, "State change failed");
628         return ret;
629     }
630
631     switch (transition) {
632     case GST_STATE_CHANGE_READY_TO_PAUSED:
633     {
634         GST_DEBUG_OBJECT(src, "READY->PAUSED");
635         GRefPtr<WebKitWebSrc> protector(src);
636         priv->notifier.notify(MainThreadSourceNotification::Start, [protector] { webKitWebSrcStart(protector.get()); });
637         break;
638     }
639     case GST_STATE_CHANGE_PAUSED_TO_READY:
640     {
641         GST_DEBUG_OBJECT(src, "PAUSED->READY");
642         priv->notifier.cancelPendingNotifications();
643         GRefPtr<WebKitWebSrc> protector(src);
644         priv->notifier.notify(MainThreadSourceNotification::Stop, [protector] { webKitWebSrcStop(protector.get()); });
645         break;
646     }
647     default:
648         break;
649     }
650
651     return ret;
652 }
653
654 static gboolean webKitWebSrcQueryWithParent(GstPad* pad, GstObject* parent, GstQuery* query)
655 {
656     WebKitWebSrc* src = WEBKIT_WEB_SRC(GST_ELEMENT(parent));
657     gboolean result = FALSE;
658
659     switch (GST_QUERY_TYPE(query)) {
660     case GST_QUERY_DURATION: {
661         GstFormat format;
662
663         gst_query_parse_duration(query, &format, NULL);
664
665         GST_DEBUG_OBJECT(src, "duration query in format %s", gst_format_get_name(format));
666         WTF::GMutexLocker<GMutex> locker(*GST_OBJECT_GET_LOCK(src));
667         if (format == GST_FORMAT_BYTES && src->priv->size > 0) {
668             gst_query_set_duration(query, format, src->priv->size);
669             result = TRUE;
670         }
671         break;
672     }
673     case GST_QUERY_URI: {
674         WTF::GMutexLocker<GMutex> locker(*GST_OBJECT_GET_LOCK(src));
675         gst_query_set_uri(query, src->priv->uri);
676         result = TRUE;
677         break;
678     }
679 #if GST_CHECK_VERSION(1, 2, 0)
680     case GST_QUERY_SCHEDULING: {
681         GstSchedulingFlags flags;
682         int minSize, maxSize, align;
683
684         gst_query_parse_scheduling(query, &flags, &minSize, &maxSize, &align);
685         gst_query_set_scheduling(query, static_cast<GstSchedulingFlags>(flags | GST_SCHEDULING_FLAG_BANDWIDTH_LIMITED), minSize, maxSize, align);
686         result = TRUE;
687         break;
688     }
689 #endif
690     default: {
691         GRefPtr<GstPad> target = adoptGRef(gst_ghost_pad_get_target(GST_GHOST_PAD_CAST(pad)));
692
693         // Forward the query to the proxy target pad.
694         if (target)
695             result = gst_pad_query(target.get(), query);
696         break;
697     }
698     }
699
700     return result;
701 }
702
703 static bool urlHasSupportedProtocol(const URL& url)
704 {
705     return url.isValid() && (url.protocolIsInHTTPFamily() || url.protocolIs("blob"));
706 }
707
708 // uri handler interface
709
710 static GstURIType webKitWebSrcUriGetType(GType)
711 {
712     return GST_URI_SRC;
713 }
714
715 const gchar* const* webKitWebSrcGetProtocols(GType)
716 {
717     static const char* protocols[] = {"http", "https", "blob", 0 };
718     return protocols;
719 }
720
721 static gchar* webKitWebSrcGetUri(GstURIHandler* handler)
722 {
723     WebKitWebSrc* src = WEBKIT_WEB_SRC(handler);
724     gchar* ret;
725
726     WTF::GMutexLocker<GMutex> locker(*GST_OBJECT_GET_LOCK(src));
727     ret = g_strdup(src->priv->uri);
728     return ret;
729 }
730
731 static gboolean webKitWebSrcSetUri(GstURIHandler* handler, const gchar* uri, GError** error)
732 {
733     WebKitWebSrc* src = WEBKIT_WEB_SRC(handler);
734     WebKitWebSrcPrivate* priv = src->priv;
735
736     if (GST_STATE(src) >= GST_STATE_PAUSED) {
737         GST_ERROR_OBJECT(src, "URI can only be set in states < PAUSED");
738         return FALSE;
739     }
740
741     WTF::GMutexLocker<GMutex> locker(*GST_OBJECT_GET_LOCK(src));
742
743     g_free(priv->uri);
744     priv->uri = 0;
745
746     if (!uri)
747         return TRUE;
748
749     URL url(URL(), uri);
750     if (!urlHasSupportedProtocol(url)) {
751         g_set_error(error, GST_URI_ERROR, GST_URI_ERROR_BAD_URI, "Invalid URI '%s'", uri);
752         return FALSE;
753     }
754
755     priv->uri = g_strdup(url.string().utf8().data());
756     return TRUE;
757 }
758
759 static void webKitWebSrcUriHandlerInit(gpointer gIface, gpointer)
760 {
761     GstURIHandlerInterface* iface = (GstURIHandlerInterface *) gIface;
762
763     iface->get_type = webKitWebSrcUriGetType;
764     iface->get_protocols = webKitWebSrcGetProtocols;
765     iface->get_uri = webKitWebSrcGetUri;
766     iface->set_uri = webKitWebSrcSetUri;
767 }
768
769 // appsrc callbacks
770
771 static void webKitWebSrcNeedData(WebKitWebSrc* src)
772 {
773     WebKitWebSrcPrivate* priv = src->priv;
774
775     ASSERT(isMainThread());
776
777     GST_DEBUG_OBJECT(src, "Need more data");
778
779     {
780         WTF::GMutexLocker<GMutex> locker(*GST_OBJECT_GET_LOCK(src));
781         priv->paused = FALSE;
782     }
783
784     if (priv->client)
785         priv->client->setDefersLoading(false);
786     if (priv->loader)
787         priv->loader->setDefersLoading(false);
788 }
789
790 static void webKitWebSrcEnoughData(WebKitWebSrc* src)
791 {
792     WebKitWebSrcPrivate* priv = src->priv;
793
794     ASSERT(isMainThread());
795
796     GST_DEBUG_OBJECT(src, "Have enough data");
797
798     {
799         WTF::GMutexLocker<GMutex> locker(*GST_OBJECT_GET_LOCK(src));
800         priv->paused = TRUE;
801     }
802
803     if (priv->client)
804         priv->client->setDefersLoading(true);
805     if (priv->loader)
806         priv->loader->setDefersLoading(true);
807 }
808
809 static void webKitWebSrcSeek(WebKitWebSrc* src)
810 {
811     ASSERT(isMainThread());
812
813     GST_DEBUG_OBJECT(src, "Seeking to offset: %" G_GUINT64_FORMAT, src->priv->requestedOffset);
814
815     webKitWebSrcStop(src);
816     webKitWebSrcStart(src);
817 }
818
819 void webKitWebSrcSetMediaPlayer(WebKitWebSrc* src, WebCore::MediaPlayer* player)
820 {
821     ASSERT(player);
822     WTF::GMutexLocker<GMutex> locker(*GST_OBJECT_GET_LOCK(src));
823     src->priv->player = player;
824 }
825
826 bool webKitSrcPassedCORSAccessCheck(WebKitWebSrc* src)
827 {
828     return src->priv->didPassAccessControlCheck;
829 }
830
831 StreamingClient::StreamingClient(WebKitWebSrc* src)
832     : m_src(static_cast<GstElement*>(gst_object_ref(src)))
833 {
834 }
835
836 StreamingClient::~StreamingClient()
837 {
838     gst_object_unref(m_src);
839 }
840
841 char* StreamingClient::createReadBuffer(size_t requestedSize, size_t& actualSize)
842 {
843     WebKitWebSrc* src = WEBKIT_WEB_SRC(m_src);
844     WebKitWebSrcPrivate* priv = src->priv;
845
846     ASSERT(!priv->buffer);
847
848     GstBuffer* buffer = gst_buffer_new_and_alloc(requestedSize);
849
850     mapGstBuffer(buffer);
851
852     WTF::GMutexLocker<GMutex> locker(*GST_OBJECT_GET_LOCK(src));
853     priv->buffer = adoptGRef(buffer);
854     locker.unlock();
855
856     actualSize = gst_buffer_get_size(buffer);
857     return getGstBufferDataPointer(buffer);
858 }
859
860 void StreamingClient::handleResponseReceived(const ResourceResponse& response)
861 {
862     WebKitWebSrc* src = WEBKIT_WEB_SRC(m_src);
863     WebKitWebSrcPrivate* priv = src->priv;
864
865     GST_DEBUG_OBJECT(src, "Received response: %d", response.httpStatusCode());
866
867     if (response.httpStatusCode() >= 400) {
868         GST_ELEMENT_ERROR(src, RESOURCE, READ, ("Received %d HTTP error code", response.httpStatusCode()), (nullptr));
869         gst_app_src_end_of_stream(priv->appsrc);
870         webKitWebSrcStop(src);
871         return;
872     }
873
874     WTF::GMutexLocker<GMutex> locker(*GST_OBJECT_GET_LOCK(src));
875
876     if (priv->isSeeking) {
877         GST_DEBUG_OBJECT(src, "Seek in progress, ignoring response");
878         return;
879     }
880
881     if (priv->requestedOffset) {
882         // Seeking ... we expect a 206 == PARTIAL_CONTENT
883         if (response.httpStatusCode() == 200) {
884             // Range request didn't have a ranged response; resetting offset.
885             priv->offset = 0;
886         } else if (response.httpStatusCode() != 206) {
887             // Range request completely failed.
888             locker.unlock();
889             GST_ELEMENT_ERROR(src, RESOURCE, READ, ("Received unexpected %d HTTP status code", response.httpStatusCode()), (nullptr));
890             gst_app_src_end_of_stream(priv->appsrc);
891             webKitWebSrcStop(src);
892             return;
893         }
894     }
895
896     long long length = response.expectedContentLength();
897     if (length > 0 && priv->requestedOffset && response.httpStatusCode() == 206)
898         length += priv->requestedOffset;
899
900     priv->size = length >= 0 ? length : 0;
901     priv->seekable = length > 0 && g_ascii_strcasecmp("none", response.httpHeaderField(HTTPHeaderName::AcceptRanges).utf8().data());
902
903     locker.unlock();
904
905     // notify size/duration
906     if (length > 0) {
907         gst_app_src_set_size(priv->appsrc, length);
908     } else
909         gst_app_src_set_size(priv->appsrc, -1);
910
911     gst_app_src_set_caps(priv->appsrc, 0);
912 }
913
914 void StreamingClient::handleDataReceived(const char* data, int length)
915 {
916     WebKitWebSrc* src = WEBKIT_WEB_SRC(m_src);
917     WebKitWebSrcPrivate* priv = src->priv;
918
919     WTF::GMutexLocker<GMutex> locker(*GST_OBJECT_GET_LOCK(src));
920
921     GST_LOG_OBJECT(src, "Have %lld bytes of data", priv->buffer ? static_cast<long long>(gst_buffer_get_size(priv->buffer.get())) : length);
922
923     ASSERT(!priv->buffer || data == getGstBufferDataPointer(priv->buffer.get()));
924
925     if (priv->buffer)
926         unmapGstBuffer(priv->buffer.get());
927
928     if (priv->isSeeking) {
929         GST_DEBUG_OBJECT(src, "Seek in progress, ignoring data");
930         priv->buffer.clear();
931         return;
932     }
933
934     if (priv->offset < priv->requestedOffset) {
935         // Range request failed; seeking manually.
936         if (priv->offset + length <= priv->requestedOffset) {
937             // Discard all the buffers coming before the requested seek position.
938             priv->offset += length;
939             priv->buffer.clear();
940             return;
941         }
942
943         if (priv->offset + length > priv->requestedOffset) {
944             guint64 offset = priv->requestedOffset - priv->offset;
945             data += offset;
946             length -= offset;
947             if (priv->buffer)
948                 gst_buffer_resize(priv->buffer.get(), offset, -1);
949             priv->offset = priv->requestedOffset;
950         }
951
952         priv->requestedOffset = 0;
953     }
954
955     // Ports using the GStreamer backend but not the soup implementation of ResourceHandle
956     // won't be using buffers provided by this client, the buffer is created here in that case.
957     if (!priv->buffer)
958         priv->buffer = adoptGRef(createGstBufferForData(data, length));
959     else
960         gst_buffer_set_size(priv->buffer.get(), static_cast<gssize>(length));
961
962     GST_BUFFER_OFFSET(priv->buffer.get()) = priv->offset;
963     if (priv->requestedOffset == priv->offset)
964         priv->requestedOffset += length;
965     priv->offset += length;
966     // priv->size == 0 if received length on didReceiveResponse < 0.
967     if (priv->size > 0 && priv->offset > priv->size) {
968         GST_DEBUG_OBJECT(src, "Updating internal size from %" G_GUINT64_FORMAT " to %" G_GUINT64_FORMAT, priv->size, priv->offset);
969         gst_app_src_set_size(priv->appsrc, priv->offset);
970         priv->size = priv->offset;
971     }
972     GST_BUFFER_OFFSET_END(priv->buffer.get()) = priv->offset;
973
974     locker.unlock();
975
976     GstFlowReturn ret = gst_app_src_push_buffer(priv->appsrc, priv->buffer.leakRef());
977     if (ret != GST_FLOW_OK && ret != GST_FLOW_EOS)
978         GST_ELEMENT_ERROR(src, CORE, FAILED, (0), (0));
979 }
980
981 void StreamingClient::handleNotifyFinished()
982 {
983     WebKitWebSrc* src = WEBKIT_WEB_SRC(m_src);
984     WebKitWebSrcPrivate* priv = src->priv;
985
986     GST_DEBUG_OBJECT(src, "Have EOS");
987
988     WTF::GMutexLocker<GMutex> locker(*GST_OBJECT_GET_LOCK(src));
989     if (!priv->isSeeking) {
990         locker.unlock();
991         gst_app_src_end_of_stream(priv->appsrc);
992     }
993 }
994
995 CachedResourceStreamingClient::CachedResourceStreamingClient(WebKitWebSrc* src)
996     : StreamingClient(src)
997 {
998 }
999
1000 CachedResourceStreamingClient::~CachedResourceStreamingClient()
1001 {
1002 }
1003
1004 #if USE(SOUP)
1005 char* CachedResourceStreamingClient::getOrCreateReadBuffer(size_t requestedSize, size_t& actualSize)
1006 {
1007     return createReadBuffer(requestedSize, actualSize);
1008 }
1009 #endif
1010
1011 void CachedResourceStreamingClient::responseReceived(const ResourceResponse& response)
1012 {
1013     WebKitWebSrcPrivate* priv = WEBKIT_WEB_SRC(m_src)->priv;
1014     priv->didPassAccessControlCheck = priv->loader->didPassAccessControlCheck();
1015     handleResponseReceived(response);
1016 }
1017
1018 void CachedResourceStreamingClient::dataReceived(const char* data, int length)
1019 {
1020     handleDataReceived(data, length);
1021 }
1022
1023 void CachedResourceStreamingClient::accessControlCheckFailed(const ResourceError& error)
1024 {
1025     WebKitWebSrc* src = WEBKIT_WEB_SRC(m_src);
1026     GST_ELEMENT_ERROR(src, RESOURCE, READ, ("%s", error.localizedDescription().utf8().data()), (nullptr));
1027     gst_app_src_end_of_stream(src->priv->appsrc);
1028     webKitWebSrcStop(src);
1029 }
1030
1031 void CachedResourceStreamingClient::loadFailed(const ResourceError& error)
1032 {
1033     WebKitWebSrc* src = WEBKIT_WEB_SRC(m_src);
1034
1035     if (!error.isCancellation()) {
1036         GST_ERROR_OBJECT(src, "Have failure: %s", error.localizedDescription().utf8().data());
1037         GST_ELEMENT_ERROR(src, RESOURCE, FAILED, ("%s", error.localizedDescription().utf8().data()), (nullptr));
1038     }
1039
1040     gst_app_src_end_of_stream(src->priv->appsrc);
1041 }
1042
1043 void CachedResourceStreamingClient::loadFinished()
1044 {
1045     handleNotifyFinished();
1046 }
1047
1048 ResourceHandleStreamingClient::ResourceHandleStreamingClient(WebKitWebSrc* src, const ResourceRequest& request)
1049     : StreamingClient(src)
1050 {
1051     m_resource = ResourceHandle::create(0 /*context*/, request, this, false, false);
1052 }
1053
1054 ResourceHandleStreamingClient::~ResourceHandleStreamingClient()
1055 {
1056     if (m_resource) {
1057         m_resource->cancel();
1058         m_resource = nullptr;
1059     }
1060 }
1061
1062 bool ResourceHandleStreamingClient::loadFailed() const
1063 {
1064     return !m_resource;
1065 }
1066
1067 void ResourceHandleStreamingClient::setDefersLoading(bool defers)
1068 {
1069     if (m_resource)
1070         m_resource->setDefersLoading(defers);
1071 }
1072
1073 char* ResourceHandleStreamingClient::getOrCreateReadBuffer(size_t requestedSize, size_t& actualSize)
1074 {
1075     return createReadBuffer(requestedSize, actualSize);
1076 }
1077
1078 void ResourceHandleStreamingClient::willSendRequest(ResourceHandle*, ResourceRequest&, const ResourceResponse&)
1079 {
1080 }
1081
1082 void ResourceHandleStreamingClient::didReceiveResponse(ResourceHandle*, const ResourceResponse& response)
1083 {
1084     handleResponseReceived(response);
1085 }
1086
1087 void ResourceHandleStreamingClient::didReceiveData(ResourceHandle*, const char* /* data */, unsigned /* length */, int)
1088 {
1089     ASSERT_NOT_REACHED();
1090 }
1091
1092 void ResourceHandleStreamingClient::didReceiveBuffer(ResourceHandle*, PassRefPtr<SharedBuffer> buffer, int /* encodedLength */)
1093 {
1094     // This pattern is suggested by SharedBuffer.h.
1095     const char* segment;
1096     unsigned position = 0;
1097     while (unsigned length = buffer->getSomeData(segment, position)) {
1098         handleDataReceived(segment, length);
1099         position += length;
1100     }
1101 }
1102
1103 void ResourceHandleStreamingClient::didFinishLoading(ResourceHandle*, double)
1104 {
1105     handleNotifyFinished();
1106 }
1107
1108 void ResourceHandleStreamingClient::didFail(ResourceHandle*, const ResourceError& error)
1109 {
1110     WebKitWebSrc* src = WEBKIT_WEB_SRC(m_src);
1111
1112     GST_ERROR_OBJECT(src, "Have failure: %s", error.localizedDescription().utf8().data());
1113     GST_ELEMENT_ERROR(src, RESOURCE, FAILED, ("%s", error.localizedDescription().utf8().data()), (0));
1114     gst_app_src_end_of_stream(src->priv->appsrc);
1115 }
1116
1117 void ResourceHandleStreamingClient::wasBlocked(ResourceHandle*)
1118 {
1119     WebKitWebSrc* src = WEBKIT_WEB_SRC(m_src);
1120     GUniquePtr<gchar> uri;
1121
1122     GST_ERROR_OBJECT(src, "Request was blocked");
1123
1124     WTF::GMutexLocker<GMutex> locker(*GST_OBJECT_GET_LOCK(src));
1125     uri.reset(g_strdup(src->priv->uri));
1126     locker.unlock();
1127
1128     GST_ELEMENT_ERROR(src, RESOURCE, OPEN_READ, ("Access to \"%s\" was blocked", uri.get()), (0));
1129 }
1130
1131 void ResourceHandleStreamingClient::cannotShowURL(ResourceHandle*)
1132 {
1133     WebKitWebSrc* src = WEBKIT_WEB_SRC(m_src);
1134     GUniquePtr<gchar> uri;
1135
1136     GST_ERROR_OBJECT(src, "Cannot show URL");
1137
1138     WTF::GMutexLocker<GMutex> locker(*GST_OBJECT_GET_LOCK(src));
1139     uri.reset(g_strdup(src->priv->uri));
1140     locker.unlock();
1141
1142     GST_ELEMENT_ERROR(src, RESOURCE, OPEN_READ, ("Can't show \"%s\"", uri.get()), (0));
1143 }
1144
1145 #endif // USE(GSTREAMER)
1146