c61300963fb9e6d80c833e775df9861fa920b830
[WebKit-https.git] / Source / WebCore / platform / graphics / gstreamer / WebKitWebSourceGStreamer.cpp
1 /*
2  *  Copyright (C) 2009, 2010 Sebastian Dröge <sebastian.droege@collabora.co.uk>
3  *  Copyright (C) 2013 Collabora Ltd.
4  *
5  *  This library is free software; you can redistribute it and/or
6  *  modify it under the terms of the GNU Lesser General Public
7  *  License as published by the Free Software Foundation; either
8  *  version 2 of the License, or (at your option) any later version.
9  *
10  *  This library is distributed in the hope that it will be useful,
11  *  but WITHOUT ANY WARRANTY; without even the implied warranty of
12  *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  *  Lesser General Public License for more details.
14  *
15  *  You should have received a copy of the GNU Lesser General Public
16  *  License along with this library; if not, write to the Free Software
17  *  Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301  USA
18  */
19
20 #include "config.h"
21 #include "WebKitWebSourceGStreamer.h"
22
23 #if ENABLE(VIDEO) && USE(GSTREAMER)
24
25 #include "GRefPtrGStreamer.h"
26 #include "GStreamerUtilities.h"
27 #include "GUniquePtrGStreamer.h"
28 #include "HTTPHeaderNames.h"
29 #include "MainThreadNotifier.h"
30 #include "MediaPlayer.h"
31 #include "NotImplemented.h"
32 #include "PlatformMediaResourceLoader.h"
33 #include "ResourceError.h"
34 #include "ResourceHandle.h"
35 #include "ResourceHandleClient.h"
36 #include "ResourceRequest.h"
37 #include "ResourceResponse.h"
38 #include "SharedBuffer.h"
39 #include <gst/app/gstappsrc.h>
40 #include <gst/gst.h>
41 #include <gst/pbutils/missing-plugins.h>
42 #include <wtf/MainThread.h>
43 #include <wtf/Noncopyable.h>
44 #include <wtf/glib/GMutexLocker.h>
45 #include <wtf/glib/GRefPtr.h>
46 #include <wtf/glib/GUniquePtr.h>
47 #include <wtf/text/CString.h>
48
49 #if USE(SOUP)
50 #include "SoupNetworkSession.h"
51 #endif
52
53 using namespace WebCore;
54
55 class StreamingClient {
56 public:
57     StreamingClient(WebKitWebSrc*, ResourceRequest&&);
58     virtual ~StreamingClient();
59
60 protected:
61     char* createReadBuffer(size_t requestedSize, size_t& actualSize);
62     void handleResponseReceived(const ResourceResponse&);
63     void handleDataReceived(const char*, int);
64     void handleNotifyFinished();
65
66     GRefPtr<GstElement> m_src;
67     ResourceRequest m_request;
68 };
69
70 class CachedResourceStreamingClient final : public PlatformMediaResourceClient, public StreamingClient {
71     WTF_MAKE_NONCOPYABLE(CachedResourceStreamingClient);
72 public:
73     CachedResourceStreamingClient(WebKitWebSrc*, ResourceRequest&&);
74     virtual ~CachedResourceStreamingClient();
75
76 private:
77     // PlatformMediaResourceClient virtual methods.
78 #if USE(SOUP)
79     char* getOrCreateReadBuffer(PlatformMediaResource&, size_t requestedSize, size_t& actualSize) override;
80 #endif
81     void responseReceived(PlatformMediaResource&, const ResourceResponse&) override;
82     void dataReceived(PlatformMediaResource&, const char*, int) override;
83     void accessControlCheckFailed(PlatformMediaResource&, const ResourceError&) override;
84     void loadFailed(PlatformMediaResource&, const ResourceError&) override;
85     void loadFinished(PlatformMediaResource&) override;
86 };
87
88 class ResourceHandleStreamingClient : public ThreadSafeRefCounted<ResourceHandleStreamingClient>, public ResourceHandleClient, public StreamingClient {
89 public:
90     static Ref<ResourceHandleStreamingClient> create(WebKitWebSrc* src, ResourceRequest&& request)
91     {
92         return adoptRef(*new ResourceHandleStreamingClient(src, WTFMove(request)));
93     }
94     virtual ~ResourceHandleStreamingClient();
95
96     void invalidate();
97
98     // StreamingClient virtual methods.
99     bool loadFailed() const;
100     void setDefersLoading(bool);
101
102 private:
103     ResourceHandleStreamingClient(WebKitWebSrc*, ResourceRequest&&);
104     void cleanupAndStopRunLoop();
105
106     // ResourceHandleClient virtual methods.
107 #if USE(SOUP)
108     char* getOrCreateReadBuffer(size_t requestedSize, size_t& actualSize) override;
109 #endif
110     ResourceRequest willSendRequest(ResourceHandle*, ResourceRequest&&, ResourceResponse&&) override;
111     void didReceiveResponse(ResourceHandle*, ResourceResponse&&) override;
112     void didReceiveData(ResourceHandle*, const char*, unsigned, int) override;
113     void didReceiveBuffer(ResourceHandle*, Ref<SharedBuffer>&&, int encodedLength) override;
114     void didFinishLoading(ResourceHandle*) override;
115     void didFail(ResourceHandle*, const ResourceError&) override;
116     void wasBlocked(ResourceHandle*) override;
117     void cannotShowURL(ResourceHandle*) override;
118
119     RefPtr<Thread> m_thread;
120     Lock m_initializeRunLoopConditionMutex;
121     Condition m_initializeRunLoopCondition;
122     RunLoop* m_runLoop { nullptr };
123     Lock m_terminateRunLoopConditionMutex;
124     Condition m_terminateRunLoopCondition;
125     RefPtr<ResourceHandle> m_resource;
126 #if USE(SOUP)
127     std::unique_ptr<SoupNetworkSession> m_session;
128 #endif
129 };
130
131 enum MainThreadSourceNotification {
132     Start = 1 << 0,
133     Stop = 1 << 1,
134     NeedData = 1 << 2,
135     EnoughData = 1 << 3,
136     Seek = 1 << 4
137 };
138
139 #define WEBKIT_WEB_SRC_GET_PRIVATE(obj) (G_TYPE_INSTANCE_GET_PRIVATE((obj), WEBKIT_TYPE_WEB_SRC, WebKitWebSrcPrivate))
140 struct _WebKitWebSrcPrivate {
141     GstAppSrc* appsrc;
142     GstPad* srcpad;
143     CString originalURI;
144     CString redirectedURI;
145     bool keepAlive;
146     GUniquePtr<GstStructure> extraHeaders;
147     bool compress;
148     GUniquePtr<gchar> httpMethod;
149
150     WebCore::MediaPlayer* player;
151
152     RefPtr<PlatformMediaResourceLoader> loader;
153     RefPtr<PlatformMediaResource> resource;
154     RefPtr<ResourceHandleStreamingClient> client;
155
156     bool didPassAccessControlCheck;
157
158     guint64 offset;
159     guint64 size;
160     gboolean seekable;
161     bool paused;
162     bool isSeeking;
163
164     guint64 requestedOffset;
165
166     bool createdInMainThread;
167     MainThreadNotifier<MainThreadSourceNotification> notifier;
168     GRefPtr<GstBuffer> buffer;
169 };
170
171 enum {
172     PROP_0,
173     PROP_LOCATION,
174     PROP_RESOLVED_LOCATION,
175     PROP_KEEP_ALIVE,
176     PROP_EXTRA_HEADERS,
177     PROP_COMPRESS,
178     PROP_METHOD
179 };
180
181 static GstStaticPadTemplate srcTemplate = GST_STATIC_PAD_TEMPLATE("src",
182                                                                   GST_PAD_SRC,
183                                                                   GST_PAD_ALWAYS,
184                                                                   GST_STATIC_CAPS_ANY);
185
186 GST_DEBUG_CATEGORY_STATIC(webkit_web_src_debug);
187 #define GST_CAT_DEFAULT webkit_web_src_debug
188
189 static void webKitWebSrcUriHandlerInit(gpointer gIface, gpointer ifaceData);
190
191 static void webKitWebSrcDispose(GObject*);
192 static void webKitWebSrcFinalize(GObject*);
193 static void webKitWebSrcSetProperty(GObject*, guint propertyID, const GValue*, GParamSpec*);
194 static void webKitWebSrcGetProperty(GObject*, guint propertyID, GValue*, GParamSpec*);
195 static GstStateChangeReturn webKitWebSrcChangeState(GstElement*, GstStateChange);
196
197 static gboolean webKitWebSrcQueryWithParent(GstPad*, GstObject*, GstQuery*);
198
199 static void webKitWebSrcNeedData(WebKitWebSrc*);
200 static void webKitWebSrcEnoughData(WebKitWebSrc*);
201 static gboolean webKitWebSrcSeek(WebKitWebSrc*, guint64);
202
203 static GstAppSrcCallbacks appsrcCallbacks = {
204     // need_data
205     [](GstAppSrc*, guint, gpointer userData) {
206         webKitWebSrcNeedData(WEBKIT_WEB_SRC(userData));
207     },
208     // enough_data
209     [](GstAppSrc*, gpointer userData) {
210         webKitWebSrcEnoughData(WEBKIT_WEB_SRC(userData));
211     },
212     // seek_data
213     [](GstAppSrc*, guint64 offset, gpointer userData) -> gboolean {
214         return webKitWebSrcSeek(WEBKIT_WEB_SRC(userData), offset);
215     },
216     { nullptr }
217 };
218
219 #define webkit_web_src_parent_class parent_class
220 // We split this out into another macro to avoid a check-webkit-style error.
221 #define WEBKIT_WEB_SRC_CATEGORY_INIT GST_DEBUG_CATEGORY_INIT(webkit_web_src_debug, "webkitwebsrc", 0, "websrc element");
222 G_DEFINE_TYPE_WITH_CODE(WebKitWebSrc, webkit_web_src, GST_TYPE_BIN,
223                          G_IMPLEMENT_INTERFACE(GST_TYPE_URI_HANDLER, webKitWebSrcUriHandlerInit);
224                          WEBKIT_WEB_SRC_CATEGORY_INIT);
225
226 static void webkit_web_src_class_init(WebKitWebSrcClass* klass)
227 {
228     GObjectClass* oklass = G_OBJECT_CLASS(klass);
229     GstElementClass* eklass = GST_ELEMENT_CLASS(klass);
230
231     oklass->dispose = webKitWebSrcDispose;
232     oklass->finalize = webKitWebSrcFinalize;
233     oklass->set_property = webKitWebSrcSetProperty;
234     oklass->get_property = webKitWebSrcGetProperty;
235
236     gst_element_class_add_pad_template(eklass,
237                                        gst_static_pad_template_get(&srcTemplate));
238     gst_element_class_set_metadata(eklass, "WebKit Web source element", "Source", "Handles HTTP/HTTPS uris",
239                                "Sebastian Dröge <sebastian.droege@collabora.co.uk>");
240
241     /* Allows setting the uri using the 'location' property, which is used
242      * for example by gst_element_make_from_uri() */
243     g_object_class_install_property(oklass, PROP_LOCATION,
244         g_param_spec_string("location", "location", "Location to read from",
245             nullptr, static_cast<GParamFlags>(G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)));
246
247     g_object_class_install_property(oklass, PROP_RESOLVED_LOCATION,
248         g_param_spec_string("resolved-location", "Resolved location", "The location resolved by the server",
249             nullptr, static_cast<GParamFlags>(G_PARAM_READABLE | G_PARAM_STATIC_STRINGS)));
250
251     g_object_class_install_property(oklass, PROP_KEEP_ALIVE,
252         g_param_spec_boolean("keep-alive", "keep-alive", "Use HTTP persistent connections",
253             FALSE, static_cast<GParamFlags>(G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)));
254
255     g_object_class_install_property(oklass, PROP_EXTRA_HEADERS,
256         g_param_spec_boxed("extra-headers", "Extra Headers", "Extra headers to append to the HTTP request",
257             GST_TYPE_STRUCTURE, static_cast<GParamFlags>(G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)));
258
259     g_object_class_install_property(oklass, PROP_COMPRESS,
260         g_param_spec_boolean("compress", "Compress", "Allow compressed content encodings",
261             FALSE, static_cast<GParamFlags>(G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)));
262
263     g_object_class_install_property(oklass, PROP_METHOD,
264         g_param_spec_string("method", "method", "The HTTP method to use (default: GET)",
265             nullptr, static_cast<GParamFlags>(G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)));
266
267     eklass->change_state = webKitWebSrcChangeState;
268
269     g_type_class_add_private(klass, sizeof(WebKitWebSrcPrivate));
270 }
271
272 static void webkit_web_src_init(WebKitWebSrc* src)
273 {
274     WebKitWebSrcPrivate* priv = WEBKIT_WEB_SRC_GET_PRIVATE(src);
275
276     src->priv = priv;
277     new (priv) WebKitWebSrcPrivate();
278
279     priv->createdInMainThread = isMainThread();
280
281     priv->appsrc = GST_APP_SRC(gst_element_factory_make("appsrc", nullptr));
282     if (!priv->appsrc) {
283         GST_ERROR_OBJECT(src, "Failed to create appsrc");
284         return;
285     }
286
287     gst_bin_add(GST_BIN(src), GST_ELEMENT(priv->appsrc));
288
289
290     GRefPtr<GstPad> targetPad = adoptGRef(gst_element_get_static_pad(GST_ELEMENT(priv->appsrc), "src"));
291     priv->srcpad = webkitGstGhostPadFromStaticTemplate(&srcTemplate, "src", targetPad.get());
292
293     gst_element_add_pad(GST_ELEMENT(src), priv->srcpad);
294
295     GST_OBJECT_FLAG_SET(priv->srcpad, GST_PAD_FLAG_NEED_PARENT);
296     gst_pad_set_query_function(priv->srcpad, webKitWebSrcQueryWithParent);
297
298     gst_app_src_set_callbacks(priv->appsrc, &appsrcCallbacks, src, nullptr);
299     gst_app_src_set_emit_signals(priv->appsrc, FALSE);
300     gst_app_src_set_stream_type(priv->appsrc, GST_APP_STREAM_TYPE_SEEKABLE);
301
302     // 512k is a abitrary number but we should choose a value
303     // here to not pause/unpause the SoupMessage too often and
304     // to make sure there's always some data available for
305     // GStreamer to handle.
306     gst_app_src_set_max_bytes(priv->appsrc, 512 * 1024);
307
308     // Emit the need-data signal if the queue contains less
309     // than 20% of data. Without this the need-data signal
310     // is emitted when the queue is empty, we then dispatch
311     // the soup message unpausing to the main loop and from
312     // there unpause the soup message. This already takes
313     // quite some time and libsoup even needs some more time
314     // to actually provide data again. If we do all this
315     // already if the queue is 20% empty, it's much more
316     // likely that libsoup already provides new data before
317     // the queue is really empty.
318     // This might need tweaking for ports not using libsoup.
319     g_object_set(priv->appsrc, "min-percent", 20, nullptr);
320
321     gst_base_src_set_automatic_eos(GST_BASE_SRC(priv->appsrc), FALSE);
322
323     gst_app_src_set_caps(priv->appsrc, nullptr);
324     gst_app_src_set_size(priv->appsrc, -1);
325 }
326
327 static void webKitWebSrcDispose(GObject* object)
328 {
329     WebKitWebSrc* src = WEBKIT_WEB_SRC(object);
330     WebKitWebSrcPrivate* priv = src->priv;
331
332     priv->player = nullptr;
333
334     GST_CALL_PARENT(G_OBJECT_CLASS, dispose, (object));
335 }
336
337 static void webKitWebSrcFinalize(GObject* object)
338 {
339     WebKitWebSrcPrivate* priv = WEBKIT_WEB_SRC(object)->priv;
340
341     priv->~WebKitWebSrcPrivate();
342
343     GST_CALL_PARENT(G_OBJECT_CLASS, finalize, (object));
344 }
345
346 static void webKitWebSrcSetProperty(GObject* object, guint propID, const GValue* value, GParamSpec* pspec)
347 {
348     WebKitWebSrc* src = WEBKIT_WEB_SRC(object);
349
350     switch (propID) {
351     case PROP_LOCATION:
352         gst_uri_handler_set_uri(reinterpret_cast<GstURIHandler*>(src), g_value_get_string(value), nullptr);
353         break;
354     case PROP_KEEP_ALIVE:
355         src->priv->keepAlive = g_value_get_boolean(value);
356         break;
357     case PROP_EXTRA_HEADERS: {
358         const GstStructure* s = gst_value_get_structure(value);
359         src->priv->extraHeaders.reset(s ? gst_structure_copy(s) : nullptr);
360         break;
361     }
362     case PROP_COMPRESS:
363         src->priv->compress = g_value_get_boolean(value);
364         break;
365     case PROP_METHOD:
366         src->priv->httpMethod.reset(g_value_dup_string(value));
367         break;
368     default:
369         G_OBJECT_WARN_INVALID_PROPERTY_ID(object, propID, pspec);
370         break;
371     }
372 }
373
374 static void webKitWebSrcGetProperty(GObject* object, guint propID, GValue* value, GParamSpec* pspec)
375 {
376     WebKitWebSrc* src = WEBKIT_WEB_SRC(object);
377     WebKitWebSrcPrivate* priv = src->priv;
378
379     WTF::GMutexLocker<GMutex> locker(*GST_OBJECT_GET_LOCK(src));
380     switch (propID) {
381     case PROP_LOCATION:
382         g_value_set_string(value, priv->originalURI.data());
383         break;
384     case PROP_RESOLVED_LOCATION:
385         g_value_set_string(value, priv->redirectedURI.isNull() ? priv->originalURI.data() : priv->redirectedURI.data());
386         break;
387     case PROP_KEEP_ALIVE:
388         g_value_set_boolean(value, priv->keepAlive);
389         break;
390     case PROP_EXTRA_HEADERS:
391         gst_value_set_structure(value, priv->extraHeaders.get());
392         break;
393     case PROP_COMPRESS:
394         g_value_set_boolean(value, priv->compress);
395         break;
396     case PROP_METHOD:
397         g_value_set_string(value, priv->httpMethod.get());
398         break;
399     default:
400         G_OBJECT_WARN_INVALID_PROPERTY_ID(object, propID, pspec);
401         break;
402     }
403 }
404
405 static void webKitWebSrcStop(WebKitWebSrc* src)
406 {
407     WebKitWebSrcPrivate* priv = src->priv;
408
409     if (priv->resource || (priv->loader && !priv->keepAlive)) {
410         GRefPtr<WebKitWebSrc> protector = WTF::ensureGRef(src);
411         priv->notifier.cancelPendingNotifications(MainThreadSourceNotification::NeedData | MainThreadSourceNotification::EnoughData | MainThreadSourceNotification::Seek);
412         priv->notifier.notify(MainThreadSourceNotification::Stop, [protector, keepAlive = priv->keepAlive] {
413             WebKitWebSrcPrivate* priv = protector->priv;
414
415             WTF::GMutexLocker<GMutex> locker(*GST_OBJECT_GET_LOCK(protector.get()));
416             if (priv->resource) {
417                 priv->resource->stop();
418                 priv->resource->setClient(nullptr);
419                 priv->resource = nullptr;
420             }
421
422             if (!keepAlive)
423                 priv->loader = nullptr;
424         });
425     }
426
427     if (priv->client) {
428         priv->client->invalidate();
429         priv->client = nullptr;
430     }
431
432     WTF::GMutexLocker<GMutex> locker(*GST_OBJECT_GET_LOCK(src));
433
434     bool wasSeeking = std::exchange(priv->isSeeking, false);
435
436     if (priv->buffer) {
437         unmapGstBuffer(priv->buffer.get());
438         priv->buffer.clear();
439     }
440
441     priv->paused = false;
442
443     priv->offset = 0;
444     priv->seekable = FALSE;
445
446     if (!wasSeeking) {
447         priv->size = 0;
448         priv->requestedOffset = 0;
449         priv->player = nullptr;
450     }
451
452     locker.unlock();
453
454     if (priv->appsrc) {
455         gst_app_src_set_caps(priv->appsrc, nullptr);
456         if (!wasSeeking)
457             gst_app_src_set_size(priv->appsrc, -1);
458     }
459
460     GST_DEBUG_OBJECT(src, "Stopped request");
461 }
462
463 static bool webKitWebSrcSetExtraHeader(GQuark fieldId, const GValue* value, gpointer userData)
464 {
465     GUniquePtr<gchar> fieldContent;
466
467     if (G_VALUE_HOLDS_STRING(value))
468         fieldContent.reset(g_value_dup_string(value));
469     else {
470         GValue dest = G_VALUE_INIT;
471
472         g_value_init(&dest, G_TYPE_STRING);
473         if (g_value_transform(value, &dest))
474             fieldContent.reset(g_value_dup_string(&dest));
475     }
476
477     const gchar* fieldName = g_quark_to_string(fieldId);
478     if (!fieldContent.get()) {
479         GST_ERROR("extra-headers field '%s' contains no value or can't be converted to a string", fieldName);
480         return false;
481     }
482
483     GST_DEBUG("Appending extra header: \"%s: %s\"", fieldName, fieldContent.get());
484     ResourceRequest* request = static_cast<ResourceRequest*>(userData);
485     request->setHTTPHeaderField(fieldName, fieldContent.get());
486     return true;
487 }
488
489 static gboolean webKitWebSrcProcessExtraHeaders(GQuark fieldId, const GValue* value, gpointer userData)
490 {
491     if (G_VALUE_TYPE(value) == GST_TYPE_ARRAY) {
492         unsigned size = gst_value_array_get_size(value);
493
494         for (unsigned i = 0; i < size; i++) {
495             if (!webKitWebSrcSetExtraHeader(fieldId, gst_value_array_get_value(value, i), userData))
496                 return FALSE;
497         }
498         return TRUE;
499     }
500
501     if (G_VALUE_TYPE(value) == GST_TYPE_LIST) {
502         unsigned size = gst_value_list_get_size(value);
503
504         for (unsigned i = 0; i < size; i++) {
505             if (!webKitWebSrcSetExtraHeader(fieldId, gst_value_list_get_value(value, i), userData))
506                 return FALSE;
507         }
508         return TRUE;
509     }
510
511     return webKitWebSrcSetExtraHeader(fieldId, value, userData);
512 }
513
514 static void webKitWebSrcStart(WebKitWebSrc* src)
515 {
516     WebKitWebSrcPrivate* priv = src->priv;
517
518     WTF::GMutexLocker<GMutex> locker(*GST_OBJECT_GET_LOCK(src));
519
520     priv->didPassAccessControlCheck = false;
521
522     if (priv->originalURI.isNull()) {
523         GST_ERROR_OBJECT(src, "No URI provided");
524         locker.unlock();
525         webKitWebSrcStop(src);
526         return;
527     }
528
529     ASSERT(!priv->client);
530
531     GST_DEBUG_OBJECT(src, "Fetching %s", priv->originalURI.data());
532     URL url = URL(URL(), priv->originalURI.data());
533
534     ResourceRequest request(url);
535     request.setAllowCookies(true);
536     request.setFirstPartyForCookies(url);
537
538     priv->size = 0;
539
540     if (priv->player)
541         request.setHTTPReferrer(priv->player->referrer());
542
543     if (priv->httpMethod.get())
544         request.setHTTPMethod(priv->httpMethod.get());
545
546 #if USE(SOUP)
547     // By default, HTTP Accept-Encoding is disabled here as we don't
548     // want the received response to be encoded in any way as we need
549     // to rely on the proper size of the returned data on
550     // didReceiveResponse.
551     // If Accept-Encoding is used, the server may send the data in encoded format and
552     // request.expectedContentLength() will have the "wrong" size (the size of the
553     // compressed data), even though the data received in didReceiveData is uncompressed.
554     // This is however useful to enable for adaptive streaming
555     // scenarios, when the demuxer needs to download playlists.
556     if (!priv->compress)
557         request.setAcceptEncoding(false);
558 #endif
559
560     // Let Apple web servers know we want to access their nice movie trailers.
561     if (!g_ascii_strcasecmp("movies.apple.com", url.host().utf8().data())
562         || !g_ascii_strcasecmp("trailers.apple.com", url.host().utf8().data()))
563         request.setHTTPUserAgent("Quicktime/7.6.6");
564
565     if (priv->requestedOffset) {
566         GUniquePtr<gchar> val(g_strdup_printf("bytes=%" G_GUINT64_FORMAT "-", priv->requestedOffset));
567         request.setHTTPHeaderField(HTTPHeaderName::Range, val.get());
568     }
569     priv->offset = priv->requestedOffset;
570
571     if (!priv->keepAlive) {
572         GST_DEBUG_OBJECT(src, "Persistent connection support disabled");
573         request.setHTTPHeaderField(HTTPHeaderName::Connection, "close");
574     }
575
576     if (priv->extraHeaders)
577         gst_structure_foreach(priv->extraHeaders.get(), webKitWebSrcProcessExtraHeaders, &request);
578
579     // We always request Icecast/Shoutcast metadata, just in case ...
580     request.setHTTPHeaderField(HTTPHeaderName::IcyMetadata, "1");
581
582     if (!priv->player || !priv->createdInMainThread) {
583         priv->client = ResourceHandleStreamingClient::create(src, WTFMove(request));
584         if (priv->client->loadFailed()) {
585             GST_ERROR_OBJECT(src, "Failed to setup streaming client");
586             locker.unlock();
587             webKitWebSrcStop(src);
588         } else
589             GST_DEBUG_OBJECT(src, "Started request");
590         return;
591     }
592
593     locker.unlock();
594     GRefPtr<WebKitWebSrc> protector = WTF::ensureGRef(src);
595     priv->notifier.notify(MainThreadSourceNotification::Start, [protector, request = WTFMove(request)] {
596         WebKitWebSrcPrivate* priv = protector->priv;
597
598         WTF::GMutexLocker<GMutex> locker(*GST_OBJECT_GET_LOCK(protector.get()));
599         if (!priv->loader)
600             priv->loader = priv->player->createResourceLoader();
601
602         PlatformMediaResourceLoader::LoadOptions loadOptions = 0;
603         if (request.url().protocolIsBlob())
604             loadOptions |= PlatformMediaResourceLoader::LoadOption::BufferData;
605         priv->resource = priv->loader->requestResource(ResourceRequest(request), loadOptions);
606         if (priv->resource) {
607             priv->resource->setClient(std::make_unique<CachedResourceStreamingClient>(protector.get(), ResourceRequest(request)));
608             GST_DEBUG_OBJECT(protector.get(), "Started request");
609         } else {
610             GST_ERROR_OBJECT(protector.get(), "Failed to setup streaming client");
611             priv->loader = nullptr;
612             locker.unlock();
613             webKitWebSrcStop(protector.get());
614         }
615     });
616 }
617
618 static GstStateChangeReturn webKitWebSrcChangeState(GstElement* element, GstStateChange transition)
619 {
620     GstStateChangeReturn ret = GST_STATE_CHANGE_SUCCESS;
621     WebKitWebSrc* src = WEBKIT_WEB_SRC(element);
622     WebKitWebSrcPrivate* priv = src->priv;
623
624     switch (transition) {
625     case GST_STATE_CHANGE_NULL_TO_READY:
626         if (!priv->appsrc) {
627             gst_element_post_message(element,
628                                      gst_missing_element_message_new(element, "appsrc"));
629             GST_ELEMENT_ERROR(src, CORE, MISSING_PLUGIN, (nullptr), ("no appsrc"));
630             return GST_STATE_CHANGE_FAILURE;
631         }
632         break;
633     default:
634         break;
635     }
636
637     ret = GST_ELEMENT_CLASS(parent_class)->change_state(element, transition);
638     if (G_UNLIKELY(ret == GST_STATE_CHANGE_FAILURE)) {
639         GST_DEBUG_OBJECT(src, "State change failed");
640         return ret;
641     }
642
643     switch (transition) {
644     case GST_STATE_CHANGE_READY_TO_PAUSED:
645     {
646         GST_DEBUG_OBJECT(src, "READY->PAUSED");
647         webKitWebSrcStart(src);
648         break;
649     }
650     case GST_STATE_CHANGE_PAUSED_TO_READY:
651     {
652         GST_DEBUG_OBJECT(src, "PAUSED->READY");
653         webKitWebSrcStop(src);
654         break;
655     }
656     default:
657         break;
658     }
659
660     return ret;
661 }
662
663 static gboolean webKitWebSrcQueryWithParent(GstPad* pad, GstObject* parent, GstQuery* query)
664 {
665     WebKitWebSrc* src = WEBKIT_WEB_SRC(GST_ELEMENT(parent));
666     gboolean result = FALSE;
667
668     switch (GST_QUERY_TYPE(query)) {
669     case GST_QUERY_DURATION: {
670         GstFormat format;
671
672         gst_query_parse_duration(query, &format, nullptr);
673
674         GST_DEBUG_OBJECT(src, "duration query in format %s", gst_format_get_name(format));
675         WTF::GMutexLocker<GMutex> locker(*GST_OBJECT_GET_LOCK(src));
676         if (format == GST_FORMAT_BYTES && src->priv->size > 0) {
677             gst_query_set_duration(query, format, src->priv->size);
678             result = TRUE;
679         }
680         break;
681     }
682     case GST_QUERY_URI: {
683         WTF::GMutexLocker<GMutex> locker(*GST_OBJECT_GET_LOCK(src));
684         gst_query_set_uri(query, src->priv->originalURI.data());
685         if (!src->priv->redirectedURI.isNull())
686             gst_query_set_uri_redirection(query, src->priv->redirectedURI.data());
687         result = TRUE;
688         break;
689     }
690     case GST_QUERY_SCHEDULING: {
691         GstSchedulingFlags flags;
692         int minSize, maxSize, align;
693
694         gst_query_parse_scheduling(query, &flags, &minSize, &maxSize, &align);
695         gst_query_set_scheduling(query, static_cast<GstSchedulingFlags>(flags | GST_SCHEDULING_FLAG_BANDWIDTH_LIMITED), minSize, maxSize, align);
696         result = TRUE;
697         break;
698     }
699     default: {
700         GRefPtr<GstPad> target = adoptGRef(gst_ghost_pad_get_target(GST_GHOST_PAD_CAST(pad)));
701
702         // Forward the query to the proxy target pad.
703         if (target)
704             result = gst_pad_query(target.get(), query);
705         break;
706     }
707     }
708
709     return result;
710 }
711
712 static bool urlHasSupportedProtocol(const URL& url)
713 {
714     return url.isValid() && (url.protocolIsInHTTPFamily() || url.protocolIsBlob());
715 }
716
717 // uri handler interface
718
719 static GstURIType webKitWebSrcUriGetType(GType)
720 {
721     return GST_URI_SRC;
722 }
723
724 const gchar* const* webKitWebSrcGetProtocols(GType)
725 {
726     static const char* protocols[] = {"http", "https", "blob", nullptr };
727     return protocols;
728 }
729
730 static gchar* webKitWebSrcGetUri(GstURIHandler* handler)
731 {
732     WebKitWebSrc* src = WEBKIT_WEB_SRC(handler);
733     gchar* ret;
734
735     WTF::GMutexLocker<GMutex> locker(*GST_OBJECT_GET_LOCK(src));
736     ret = g_strdup(src->priv->originalURI.data());
737     return ret;
738 }
739
740 static gboolean webKitWebSrcSetUri(GstURIHandler* handler, const gchar* uri, GError** error)
741 {
742     WebKitWebSrc* src = WEBKIT_WEB_SRC(handler);
743     WebKitWebSrcPrivate* priv = src->priv;
744
745     if (GST_STATE(src) >= GST_STATE_PAUSED) {
746         GST_ERROR_OBJECT(src, "URI can only be set in states < PAUSED");
747         return FALSE;
748     }
749
750     WTF::GMutexLocker<GMutex> locker(*GST_OBJECT_GET_LOCK(src));
751
752     priv->redirectedURI = CString();
753     priv->originalURI = CString();
754     if (!uri)
755         return TRUE;
756
757     URL url(URL(), uri);
758     if (!urlHasSupportedProtocol(url)) {
759         g_set_error(error, GST_URI_ERROR, GST_URI_ERROR_BAD_URI, "Invalid URI '%s'", uri);
760         return FALSE;
761     }
762
763     priv->originalURI = url.string().utf8();
764     return TRUE;
765 }
766
767 static void webKitWebSrcUriHandlerInit(gpointer gIface, gpointer)
768 {
769     GstURIHandlerInterface* iface = (GstURIHandlerInterface *) gIface;
770
771     iface->get_type = webKitWebSrcUriGetType;
772     iface->get_protocols = webKitWebSrcGetProtocols;
773     iface->get_uri = webKitWebSrcGetUri;
774     iface->set_uri = webKitWebSrcSetUri;
775 }
776
777 static void webKitWebSrcNeedData(WebKitWebSrc* src)
778 {
779     WebKitWebSrcPrivate* priv = src->priv;
780
781     GST_DEBUG_OBJECT(src, "Need more data");
782
783     {
784         WTF::GMutexLocker<GMutex> locker(*GST_OBJECT_GET_LOCK(src));
785         if (!priv->paused)
786             return;
787         priv->paused = false;
788         if (priv->client) {
789             priv->client->setDefersLoading(false);
790             return;
791         }
792     }
793
794     GRefPtr<WebKitWebSrc> protector = WTF::ensureGRef(src);
795     priv->notifier.notify(MainThreadSourceNotification::NeedData, [protector] {
796         WebKitWebSrcPrivate* priv = protector->priv;
797         if (priv->resource)
798             priv->resource->setDefersLoading(false);
799     });
800 }
801
802 static void webKitWebSrcEnoughData(WebKitWebSrc* src)
803 {
804     WebKitWebSrcPrivate* priv = src->priv;
805
806     GST_DEBUG_OBJECT(src, "Have enough data");
807
808     {
809         WTF::GMutexLocker<GMutex> locker(*GST_OBJECT_GET_LOCK(src));
810         if (priv->paused)
811             return;
812         priv->paused = true;
813         if (priv->client) {
814             priv->client->setDefersLoading(true);
815             return;
816         }
817     }
818
819     GRefPtr<WebKitWebSrc> protector = WTF::ensureGRef(src);
820     priv->notifier.notify(MainThreadSourceNotification::EnoughData, [protector] {
821         WebKitWebSrcPrivate* priv = protector->priv;
822         if (priv->resource)
823             priv->resource->setDefersLoading(true);
824     });
825 }
826
827 static gboolean webKitWebSrcSeek(WebKitWebSrc* src, guint64 offset)
828 {
829     WebKitWebSrcPrivate* priv = src->priv;
830
831     {
832         WTF::GMutexLocker<GMutex> locker(*GST_OBJECT_GET_LOCK(src));
833         if (offset == priv->offset && priv->requestedOffset == priv->offset)
834             return TRUE;
835
836         if (!priv->seekable)
837             return FALSE;
838
839         priv->isSeeking = true;
840         priv->requestedOffset = offset;
841     }
842
843     GST_DEBUG_OBJECT(src, "Seeking to offset: %" G_GUINT64_FORMAT, src->priv->requestedOffset);
844     if (priv->client) {
845         webKitWebSrcStop(src);
846         webKitWebSrcStart(src);
847         return TRUE;
848     }
849
850     GRefPtr<WebKitWebSrc> protector = WTF::ensureGRef(src);
851     priv->notifier.notify(MainThreadSourceNotification::Seek, [protector] {
852         webKitWebSrcStop(protector.get());
853         webKitWebSrcStart(protector.get());
854     });
855     return TRUE;
856 }
857
858 void webKitWebSrcSetMediaPlayer(WebKitWebSrc* src, WebCore::MediaPlayer* player)
859 {
860     ASSERT(player);
861     ASSERT(src->priv->createdInMainThread);
862     WTF::GMutexLocker<GMutex> locker(*GST_OBJECT_GET_LOCK(src));
863     src->priv->player = player;
864 }
865
866 bool webKitSrcPassedCORSAccessCheck(WebKitWebSrc* src)
867 {
868     return src->priv->didPassAccessControlCheck;
869 }
870
871 StreamingClient::StreamingClient(WebKitWebSrc* src, ResourceRequest&& request)
872     : m_src(GST_ELEMENT(src))
873     , m_request(WTFMove(request))
874 {
875 }
876
877 StreamingClient::~StreamingClient()
878 {
879 }
880
881 char* StreamingClient::createReadBuffer(size_t requestedSize, size_t& actualSize)
882 {
883     WebKitWebSrc* src = WEBKIT_WEB_SRC(m_src.get());
884     WebKitWebSrcPrivate* priv = src->priv;
885
886     ASSERT(!priv->buffer);
887
888     GstBuffer* buffer = gst_buffer_new_and_alloc(requestedSize);
889
890     mapGstBuffer(buffer, GST_MAP_WRITE);
891
892     WTF::GMutexLocker<GMutex> locker(*GST_OBJECT_GET_LOCK(src));
893     priv->buffer = adoptGRef(buffer);
894     locker.unlock();
895
896     actualSize = gst_buffer_get_size(buffer);
897     return getGstBufferDataPointer(buffer);
898 }
899
900 void StreamingClient::handleResponseReceived(const ResourceResponse& response)
901 {
902     WebKitWebSrc* src = WEBKIT_WEB_SRC(m_src.get());
903     WebKitWebSrcPrivate* priv = src->priv;
904
905     GST_DEBUG_OBJECT(src, "Received response: %d", response.httpStatusCode());
906
907     auto responseURI = response.url().string().utf8();
908     if (priv->originalURI != responseURI)
909         priv->redirectedURI = WTFMove(responseURI);
910
911     if (response.httpStatusCode() >= 400) {
912         GST_ELEMENT_ERROR(src, RESOURCE, READ, ("Received %d HTTP error code", response.httpStatusCode()), (nullptr));
913         gst_app_src_end_of_stream(priv->appsrc);
914         webKitWebSrcStop(src);
915         return;
916     }
917
918     WTF::GMutexLocker<GMutex> locker(*GST_OBJECT_GET_LOCK(src));
919
920     if (priv->isSeeking) {
921         GST_DEBUG_OBJECT(src, "Seek in progress, ignoring response");
922         return;
923     }
924
925     if (priv->requestedOffset) {
926         // Seeking ... we expect a 206 == PARTIAL_CONTENT
927         if (response.httpStatusCode() == 200) {
928             // Range request didn't have a ranged response; resetting offset.
929             priv->offset = 0;
930         } else if (response.httpStatusCode() != 206) {
931             // Range request completely failed.
932             locker.unlock();
933             GST_ELEMENT_ERROR(src, RESOURCE, READ, ("Received unexpected %d HTTP status code", response.httpStatusCode()), (nullptr));
934             gst_app_src_end_of_stream(priv->appsrc);
935             webKitWebSrcStop(src);
936             return;
937         }
938     }
939
940     long long length = response.expectedContentLength();
941     if (length > 0 && priv->requestedOffset && response.httpStatusCode() == 206)
942         length += priv->requestedOffset;
943
944     priv->size = length >= 0 ? length : 0;
945     priv->seekable = length > 0 && g_ascii_strcasecmp("none", response.httpHeaderField(HTTPHeaderName::AcceptRanges).utf8().data());
946
947     locker.unlock();
948
949     // notify size/duration
950     if (length > 0) {
951         gst_app_src_set_size(priv->appsrc, length);
952     } else
953         gst_app_src_set_size(priv->appsrc, -1);
954
955     gst_app_src_set_caps(priv->appsrc, nullptr);
956
957     // Emit a GST_EVENT_CUSTOM_DOWNSTREAM_STICKY event to let GStreamer know about the HTTP headers sent and received.
958     GstStructure* httpHeaders = gst_structure_new_empty("http-headers");
959     gst_structure_set(httpHeaders, "uri", G_TYPE_STRING, priv->originalURI.data(), nullptr);
960     if (!priv->redirectedURI.isNull())
961         gst_structure_set(httpHeaders, "redirection-uri", G_TYPE_STRING, priv->redirectedURI.data(), nullptr);
962     GUniquePtr<GstStructure> headers(gst_structure_new_empty("request-headers"));
963     for (const auto& header : m_request.httpHeaderFields())
964         gst_structure_set(headers.get(), header.key.utf8().data(), G_TYPE_STRING, header.value.utf8().data(), nullptr);
965     gst_structure_set(httpHeaders, "request-headers", GST_TYPE_STRUCTURE, headers.get(), nullptr);
966     headers.reset(gst_structure_new_empty("response-headers"));
967     for (const auto& header : response.httpHeaderFields())
968         gst_structure_set(headers.get(), header.key.utf8().data(), G_TYPE_STRING, header.value.utf8().data(), nullptr);
969     gst_structure_set(httpHeaders, "response-headers", GST_TYPE_STRUCTURE, headers.get(), nullptr);
970     gst_pad_push_event(GST_BASE_SRC_PAD(priv->appsrc), gst_event_new_custom(GST_EVENT_CUSTOM_DOWNSTREAM_STICKY, httpHeaders));
971 }
972
973 void StreamingClient::handleDataReceived(const char* data, int length)
974 {
975     WebKitWebSrc* src = WEBKIT_WEB_SRC(m_src.get());
976     WebKitWebSrcPrivate* priv = src->priv;
977
978     WTF::GMutexLocker<GMutex> locker(*GST_OBJECT_GET_LOCK(src));
979
980     GST_LOG_OBJECT(src, "Have %lld bytes of data", priv->buffer ? static_cast<long long>(gst_buffer_get_size(priv->buffer.get())) : length);
981
982     ASSERT(!priv->buffer || data == getGstBufferDataPointer(priv->buffer.get()));
983
984     if (priv->buffer)
985         unmapGstBuffer(priv->buffer.get());
986
987     if (priv->isSeeking) {
988         GST_DEBUG_OBJECT(src, "Seek in progress, ignoring data");
989         priv->buffer.clear();
990         return;
991     }
992
993     if (priv->offset < priv->requestedOffset) {
994         // Range request failed; seeking manually.
995         if (priv->offset + length <= priv->requestedOffset) {
996             // Discard all the buffers coming before the requested seek position.
997             priv->offset += length;
998             priv->buffer.clear();
999             return;
1000         }
1001
1002         if (priv->offset + length > priv->requestedOffset) {
1003             guint64 offset = priv->requestedOffset - priv->offset;
1004             data += offset;
1005             length -= offset;
1006             if (priv->buffer)
1007                 gst_buffer_resize(priv->buffer.get(), offset, -1);
1008             priv->offset = priv->requestedOffset;
1009         }
1010
1011         priv->requestedOffset = 0;
1012     }
1013
1014     // Ports using the GStreamer backend but not the soup implementation of ResourceHandle
1015     // won't be using buffers provided by this client, the buffer is created here in that case.
1016     if (!priv->buffer)
1017         priv->buffer = adoptGRef(createGstBufferForData(data, length));
1018     else
1019         gst_buffer_set_size(priv->buffer.get(), static_cast<gssize>(length));
1020
1021     GST_BUFFER_OFFSET(priv->buffer.get()) = priv->offset;
1022     if (priv->requestedOffset == priv->offset)
1023         priv->requestedOffset += length;
1024     priv->offset += length;
1025     // priv->size == 0 if received length on didReceiveResponse < 0.
1026     if (priv->size > 0 && priv->offset > priv->size) {
1027         GST_DEBUG_OBJECT(src, "Updating internal size from %" G_GUINT64_FORMAT " to %" G_GUINT64_FORMAT, priv->size, priv->offset);
1028         gst_app_src_set_size(priv->appsrc, priv->offset);
1029         priv->size = priv->offset;
1030     }
1031     GST_BUFFER_OFFSET_END(priv->buffer.get()) = priv->offset;
1032
1033     locker.unlock();
1034
1035     GstFlowReturn ret = gst_app_src_push_buffer(priv->appsrc, priv->buffer.leakRef());
1036     if (ret != GST_FLOW_OK && ret != GST_FLOW_EOS)
1037         GST_ELEMENT_ERROR(src, CORE, FAILED, (nullptr), (nullptr));
1038 }
1039
1040 void StreamingClient::handleNotifyFinished()
1041 {
1042     WebKitWebSrc* src = WEBKIT_WEB_SRC(m_src.get());
1043     WebKitWebSrcPrivate* priv = src->priv;
1044
1045     GST_DEBUG_OBJECT(src, "Have EOS");
1046
1047     WTF::GMutexLocker<GMutex> locker(*GST_OBJECT_GET_LOCK(src));
1048     if (!priv->isSeeking) {
1049         locker.unlock();
1050         gst_app_src_end_of_stream(priv->appsrc);
1051     }
1052 }
1053
1054 CachedResourceStreamingClient::CachedResourceStreamingClient(WebKitWebSrc* src, ResourceRequest&& request)
1055     : StreamingClient(src, WTFMove(request))
1056 {
1057 }
1058
1059 CachedResourceStreamingClient::~CachedResourceStreamingClient()
1060 {
1061 }
1062
1063 #if USE(SOUP)
1064 char* CachedResourceStreamingClient::getOrCreateReadBuffer(PlatformMediaResource&, size_t requestedSize, size_t& actualSize)
1065 {
1066     return createReadBuffer(requestedSize, actualSize);
1067 }
1068 #endif
1069
1070 void CachedResourceStreamingClient::responseReceived(PlatformMediaResource&, const ResourceResponse& response)
1071 {
1072     WebKitWebSrcPrivate* priv = WEBKIT_WEB_SRC(m_src.get())->priv;
1073     priv->didPassAccessControlCheck = priv->resource->didPassAccessControlCheck();
1074     handleResponseReceived(response);
1075 }
1076
1077 void CachedResourceStreamingClient::dataReceived(PlatformMediaResource&, const char* data, int length)
1078 {
1079     handleDataReceived(data, length);
1080 }
1081
1082 void CachedResourceStreamingClient::accessControlCheckFailed(PlatformMediaResource&, const ResourceError& error)
1083 {
1084     WebKitWebSrc* src = WEBKIT_WEB_SRC(m_src.get());
1085     GST_ELEMENT_ERROR(src, RESOURCE, READ, ("%s", error.localizedDescription().utf8().data()), (nullptr));
1086     gst_app_src_end_of_stream(src->priv->appsrc);
1087     webKitWebSrcStop(src);
1088 }
1089
1090 void CachedResourceStreamingClient::loadFailed(PlatformMediaResource&, const ResourceError& error)
1091 {
1092     WebKitWebSrc* src = WEBKIT_WEB_SRC(m_src.get());
1093
1094     if (!error.isCancellation()) {
1095         GST_ERROR_OBJECT(src, "Have failure: %s", error.localizedDescription().utf8().data());
1096         GST_ELEMENT_ERROR(src, RESOURCE, FAILED, ("%s", error.localizedDescription().utf8().data()), (nullptr));
1097     }
1098
1099     gst_app_src_end_of_stream(src->priv->appsrc);
1100 }
1101
1102 void CachedResourceStreamingClient::loadFinished(PlatformMediaResource&)
1103 {
1104     handleNotifyFinished();
1105 }
1106
1107 ResourceHandleStreamingClient::ResourceHandleStreamingClient(WebKitWebSrc* src, ResourceRequest&& request)
1108     : StreamingClient(src, WTFMove(request))
1109 {
1110     LockHolder locker(m_initializeRunLoopConditionMutex);
1111     m_thread = Thread::create("ResourceHandleStreamingClient", [this] {
1112         {
1113             LockHolder locker(m_initializeRunLoopConditionMutex);
1114             m_runLoop = &RunLoop::current();
1115 #if USE(SOUP)
1116             m_session = std::make_unique<SoupNetworkSession>();
1117             m_resource = ResourceHandle::create(*m_session, m_request, this, true, false);
1118 #else
1119             // FIXME: This create will hit an assert in debug builds. See https://bugs.webkit.org/show_bug.cgi?id=167003.
1120             m_resource = ResourceHandle::create(nullptr, m_request, this, true, false);
1121 #endif
1122             m_initializeRunLoopCondition.notifyOne();
1123         }
1124         if (!m_resource)
1125             return;
1126
1127         m_runLoop->dispatch([this] { m_resource->setDefersLoading(false); });
1128         m_runLoop->run();
1129     });
1130     m_initializeRunLoopCondition.wait(m_initializeRunLoopConditionMutex);
1131 }
1132
1133 ResourceHandleStreamingClient::~ResourceHandleStreamingClient()
1134 {
1135     if (m_thread) {
1136         m_thread->detach();
1137         m_thread = nullptr;
1138     }
1139 }
1140
1141 void ResourceHandleStreamingClient::cleanupAndStopRunLoop()
1142 {
1143     m_resource->clearClient();
1144     m_resource->cancel();
1145     m_resource = nullptr;
1146 #if USE(SOUP)
1147     m_session = nullptr;
1148 #endif
1149     m_runLoop->stop();
1150 }
1151
1152 void ResourceHandleStreamingClient::invalidate()
1153 {
1154     if (m_runLoop == &RunLoop::current()) {
1155         cleanupAndStopRunLoop();
1156         return;
1157     }
1158
1159     LockHolder locker(m_terminateRunLoopConditionMutex);
1160     m_runLoop->dispatch([this, protectedThis = makeRef(*this)] {
1161         cleanupAndStopRunLoop();
1162         LockHolder locker(m_terminateRunLoopConditionMutex);
1163         m_terminateRunLoopCondition.notifyOne();
1164     });
1165     m_terminateRunLoopCondition.wait(m_terminateRunLoopConditionMutex);
1166 }
1167
1168 bool ResourceHandleStreamingClient::loadFailed() const
1169 {
1170     return !m_resource;
1171 }
1172
1173 void ResourceHandleStreamingClient::setDefersLoading(bool defers)
1174 {
1175     m_runLoop->dispatch([this, protectedThis = makeRef(*this), defers] {
1176         if (m_resource)
1177             m_resource->setDefersLoading(defers);
1178     });
1179 }
1180
1181 #if USE(SOUP)
1182 char* ResourceHandleStreamingClient::getOrCreateReadBuffer(size_t requestedSize, size_t& actualSize)
1183 {
1184     return createReadBuffer(requestedSize, actualSize);
1185 }
1186 #endif
1187
1188 ResourceRequest ResourceHandleStreamingClient::willSendRequest(ResourceHandle*, ResourceRequest&& request, ResourceResponse&&)
1189 {
1190     return WTFMove(request);
1191 }
1192
1193 void ResourceHandleStreamingClient::didReceiveResponse(ResourceHandle*, ResourceResponse&& response)
1194 {
1195     if (m_resource)
1196         handleResponseReceived(response);
1197 }
1198
1199 void ResourceHandleStreamingClient::didReceiveData(ResourceHandle*, const char* /* data */, unsigned /* length */, int)
1200 {
1201     ASSERT_NOT_REACHED();
1202 }
1203
1204 void ResourceHandleStreamingClient::didReceiveBuffer(ResourceHandle*, Ref<SharedBuffer>&& buffer, int /* encodedLength */)
1205 {
1206     if (!m_resource)
1207         return;
1208
1209     for (const auto& segment : buffer.get())
1210         handleDataReceived(segment->data(), segment->size());
1211 }
1212
1213 void ResourceHandleStreamingClient::didFinishLoading(ResourceHandle*)
1214 {
1215     if (m_resource)
1216         handleNotifyFinished();
1217 }
1218
1219 void ResourceHandleStreamingClient::didFail(ResourceHandle*, const ResourceError& error)
1220 {
1221     WebKitWebSrc* src = WEBKIT_WEB_SRC(m_src.get());
1222
1223     GST_ERROR_OBJECT(src, "Have failure: %s", error.localizedDescription().utf8().data());
1224     GST_ELEMENT_ERROR(src, RESOURCE, FAILED, ("%s", error.localizedDescription().utf8().data()), (nullptr));
1225     gst_app_src_end_of_stream(src->priv->appsrc);
1226 }
1227
1228 void ResourceHandleStreamingClient::wasBlocked(ResourceHandle*)
1229 {
1230     WebKitWebSrc* src = WEBKIT_WEB_SRC(m_src.get());
1231     GUniquePtr<gchar> uri;
1232
1233     GST_ERROR_OBJECT(src, "Request was blocked");
1234
1235     WTF::GMutexLocker<GMutex> locker(*GST_OBJECT_GET_LOCK(src));
1236     uri.reset(g_strdup(src->priv->originalURI.data()));
1237     locker.unlock();
1238
1239     GST_ELEMENT_ERROR(src, RESOURCE, OPEN_READ, ("Access to \"%s\" was blocked", uri.get()), (nullptr));
1240 }
1241
1242 void ResourceHandleStreamingClient::cannotShowURL(ResourceHandle*)
1243 {
1244     WebKitWebSrc* src = WEBKIT_WEB_SRC(m_src.get());
1245     GUniquePtr<gchar> uri;
1246
1247     GST_ERROR_OBJECT(src, "Cannot show URL");
1248
1249     WTF::GMutexLocker<GMutex> locker(*GST_OBJECT_GET_LOCK(src));
1250     uri.reset(g_strdup(src->priv->originalURI.data()));
1251     locker.unlock();
1252
1253     GST_ELEMENT_ERROR(src, RESOURCE, OPEN_READ, ("Can't show \"%s\"", uri.get()), (nullptr));
1254 }
1255
1256 #endif // USE(GSTREAMER)
1257