[GStreamer] Rewrite HTTP source element using pushsrc base class
authorphiln@webkit.org <philn@webkit.org@268f45cc-cd09-0410-ab3c-d52691b4dbfc>
Mon, 18 Mar 2019 10:34:44 +0000 (10:34 +0000)
committerphiln@webkit.org <philn@webkit.org@268f45cc-cd09-0410-ab3c-d52691b4dbfc>
Mon, 18 Mar 2019 10:34:44 +0000 (10:34 +0000)
https://bugs.webkit.org/show_bug.cgi?id=195631

Reviewed by Xabier Rodriguez-Calvar.

Source/WebCore:

If we want to use webkitwebsrc in adaptivedemux (HLS, DASH, etc)
we need a source element that behaves like souphttpsrc, which is
implemented using pushsrc. This rewrite might also fix some seek
issues.

No new tests, existing http/tests/media tests cover this patch.

* platform/graphics/gstreamer/MediaPlayerPrivateGStreamer.cpp:
(WebCore::MediaPlayerPrivateGStreamer::handleMessage):
* platform/graphics/gstreamer/WebKitWebSourceGStreamer.cpp:
(webkit_web_src_class_init):
(webkitWebSrcReset):
(webkit_web_src_init):
(webKitWebSrcCreate):
(webKitWebSrcStart):
(webKitWebSrcCloseSession):
(webKitWebSrcStop):
(webKitWebSrcGetSize):
(webKitWebSrcIsSeekable):
(webKitWebSrcDoSeek):
(webKitWebSrcQuery):
(webKitWebSrcUnLock):
(webKitWebSrcUnLockStop):
(webKitWebSrcChangeState):
(CachedResourceStreamingClient::checkUpdateBlocksize):
(CachedResourceStreamingClient::responseReceived):
(CachedResourceStreamingClient::dataReceived):
(CachedResourceStreamingClient::accessControlCheckFailed):
(CachedResourceStreamingClient::loadFailed):
(CachedResourceStreamingClient::loadFinished):
* platform/graphics/gstreamer/WebKitWebSourceGStreamer.h:

LayoutTests:

* platform/gtk/TestExpectations:
* platform/gtk/http/tests/media/hls/video-controls-live-stream-expected.txt:
Update expectations, though it's not really related with this
patch.

git-svn-id: https://svn.webkit.org/repository/webkit/trunk@243058 268f45cc-cd09-0410-ab3c-d52691b4dbfc

LayoutTests/ChangeLog
LayoutTests/http/tests/media/video-play-stall-seek-expected.txt
LayoutTests/platform/gtk/TestExpectations
LayoutTests/platform/gtk/http/tests/media/hls/video-controls-live-stream-expected.txt
Source/WebCore/ChangeLog
Source/WebCore/platform/graphics/gstreamer/MainThreadNotifier.h
Source/WebCore/platform/graphics/gstreamer/MediaPlayerPrivateGStreamer.cpp
Source/WebCore/platform/graphics/gstreamer/WebKitWebSourceGStreamer.cpp
Source/WebCore/platform/graphics/gstreamer/WebKitWebSourceGStreamer.h

index bffe672..b5c6b28 100644 (file)
@@ -1,3 +1,15 @@
+2019-03-18  Philippe Normand  <pnormand@igalia.com>
+
+        [GStreamer] Rewrite HTTP source element using pushsrc base class
+        https://bugs.webkit.org/show_bug.cgi?id=195631
+
+        Reviewed by Xabier Rodriguez-Calvar.
+
+        * platform/gtk/TestExpectations:
+        * platform/gtk/http/tests/media/hls/video-controls-live-stream-expected.txt:
+        Update expectations, though it's not really related with this
+        patch.
+
 2019-03-18  Claudio Saavedra  <csaavedra@igalia.com>
 
         [GTK][GStreamer] Mark flaky crashing test
index f852f1f..069aad5 100644 (file)
@@ -2210,7 +2210,6 @@ Bug(GTK) jquery/traversing.html [ Pass Slow ]
 Bug(GTK) tables/mozilla/other/slashlogo.html [ Pass Slow ]
 
 Bug(GTK) http/tests/media/video-preload.html [ Pass Slow ]
-webkit.org/b/143989 [ Release ] http/tests/media/hls/video-controls-live-stream.html [ Pass Slow Failure ]
 
 webkit.org/b/116958 http/tests/navigation/slowmetaredirect-basic.html [ Pass Slow ]
 webkit.org/b/116958 http/tests/navigation/slowtimerredirect-basic.html [ Pass Slow ]
@@ -2735,8 +2734,6 @@ webkit.org/b/122021 media/video-controls-captions-trackmenu.html [ Failure ]
 webkit.org/b/123097 media/track/track-user-preferences.html [ Skip ]
 webkit.org/b/121995 media/video-controls-captions-trackmenu-includes-enabled-track.html [ Failure ]
 
-Bug(GTK) http/tests/media/video-play-stall-seek.html [ Skip ]
-
 Bug(GTK) http/tests/misc/acid3.html [ Failure ]
 
 Bug(GTK) media/video-size-intrinsic-scale.html [ Failure ]
index b71aad3..4570a2b 100644 (file)
@@ -1,13 +1,14 @@
 
+EVENT(canplaythrough)
 EVENT(play)
 EXPECTED (video.duration == 'Infinity') OK
 -webkit-media-text-track-container: classes: [hidden]
 -webkit-media-controls-enclosure: classes: []
--webkit-media-controls-panel: classes: [paused]
+-webkit-media-controls-panel: classes: [paused show]
 -webkit-media-controls-play-button: classes: [paused]
 -webkit-media-controls-timeline: classes: []
 -webkit-media-controls-current-time-display: classes: [hour-long-time]
--webkit-media-controls-time-remaining-display: classes: [hidden hour-long-time]
+-webkit-media-controls-time-remaining-display: classes: [hour-long-time hidden]
 -webkit-media-controls-toggle-closed-captions-button: classes: [hidden]
 -webkit-media-controls-fullscreen-button: classes: []
 none: classes: [mute-box]
index 15b428d..e7070d6 100644 (file)
@@ -1,3 +1,42 @@
+2019-03-18  Philippe Normand  <pnormand@igalia.com>
+
+        [GStreamer] Rewrite HTTP source element using pushsrc base class
+        https://bugs.webkit.org/show_bug.cgi?id=195631
+
+        Reviewed by Xabier Rodriguez-Calvar.
+
+        If we want to use webkitwebsrc in adaptivedemux (HLS, DASH, etc)
+        we need a source element that behaves like souphttpsrc, which is
+        implemented using pushsrc. This rewrite might also fix some seek
+        issues.
+
+        No new tests, existing http/tests/media tests cover this patch.
+
+        * platform/graphics/gstreamer/MediaPlayerPrivateGStreamer.cpp:
+        (WebCore::MediaPlayerPrivateGStreamer::handleMessage):
+        * platform/graphics/gstreamer/WebKitWebSourceGStreamer.cpp:
+        (webkit_web_src_class_init):
+        (webkitWebSrcReset):
+        (webkit_web_src_init):
+        (webKitWebSrcCreate):
+        (webKitWebSrcStart):
+        (webKitWebSrcCloseSession):
+        (webKitWebSrcStop):
+        (webKitWebSrcGetSize):
+        (webKitWebSrcIsSeekable):
+        (webKitWebSrcDoSeek):
+        (webKitWebSrcQuery):
+        (webKitWebSrcUnLock):
+        (webKitWebSrcUnLockStop):
+        (webKitWebSrcChangeState):
+        (CachedResourceStreamingClient::checkUpdateBlocksize):
+        (CachedResourceStreamingClient::responseReceived):
+        (CachedResourceStreamingClient::dataReceived):
+        (CachedResourceStreamingClient::accessControlCheckFailed):
+        (CachedResourceStreamingClient::loadFailed):
+        (CachedResourceStreamingClient::loadFinished):
+        * platform/graphics/gstreamer/WebKitWebSourceGStreamer.h:
+
 2019-03-18  Gyuyoung Kim  <gyuyoung.kim@webkit.org>
 
         [WPE][GTK] Fix a build warning because of missing to handle an enum value
index 5eada63..92516c2 100644 (file)
@@ -63,6 +63,24 @@ public:
         });
     }
 
+    template<typename F>
+    void notifyAndWait(T notificationType, F&& callbackFunctor)
+    {
+        Lock mutex;
+        Condition condition;
+
+        notify(notificationType, [functor = WTFMove(callbackFunctor), &condition, &mutex] {
+            functor();
+            LockHolder holder(mutex);
+            condition.notifyOne();
+        });
+
+        if (!isMainThread()) {
+            LockHolder holder(mutex);
+            condition.wait(mutex);
+        }
+    }
+
     void cancelPendingNotifications(unsigned mask = 0)
     {
         ASSERT(m_isValid.load());
index 6bdba02..1fee962 100644 (file)
@@ -533,13 +533,17 @@ void MediaPlayerPrivateGStreamer::seek(const MediaTime& mediaTime)
     GST_INFO_OBJECT(pipeline(), "[Seek] seek attempt to %s", toString(mediaTime).utf8().data());
 
     // Avoid useless seeking.
-    if (mediaTime == currentMediaTime())
+    if (mediaTime == currentMediaTime()) {
+        GST_DEBUG_OBJECT(pipeline(), "[Seek] seek to EOS position unhandled");
         return;
+    }
 
     MediaTime time = std::min(mediaTime, durationMediaTime());
 
-    if (isLiveStream())
+    if (isLiveStream()) {
+        GST_DEBUG_OBJECT(pipeline(), "[Seek] Live stream seek unhandled");
         return;
+    }
 
     GST_INFO_OBJECT(pipeline(), "[Seek] seeking to %s", toString(time).utf8().data());
 
@@ -1196,7 +1200,7 @@ void MediaPlayerPrivateGStreamer::handleMessage(GstMessage* message)
     // We ignore state changes from internal elements. They are forwarded to playbin2 anyway.
     bool messageSourceIsPlaybin = GST_MESSAGE_SRC(message) == reinterpret_cast<GstObject*>(m_pipeline.get());
 
-    GST_LOG("Message %s received from element %s", GST_MESSAGE_TYPE_NAME(message), GST_MESSAGE_SRC_NAME(message));
+    GST_LOG_OBJECT(pipeline(), "Message %s received from element %s", GST_MESSAGE_TYPE_NAME(message), GST_MESSAGE_SRC_NAME(message));
     switch (GST_MESSAGE_TYPE(message)) {
     case GST_MESSAGE_ERROR:
         if (m_resetPipeline || !m_missingPluginCallbacks.isEmpty() || m_errorOccured)
@@ -1342,14 +1346,33 @@ void MediaPlayerPrivateGStreamer::handleMessage(GstMessage* message)
         }
 #endif
         else if (gst_structure_has_name(structure, "http-headers")) {
-            GstStructure* responseHeaders;
-            if (gst_structure_get(structure, "response-headers", GST_TYPE_STRUCTURE, &responseHeaders, nullptr)) {
-                if (!gst_structure_has_field(responseHeaders, httpHeaderNameString(HTTPHeaderName::ContentLength).utf8().data())) {
-                    GST_INFO_OBJECT(pipeline(), "Live stream detected. Disabling on-disk buffering");
+            if (const char* uri = gst_structure_get_string(structure, "uri")) {
+                URL url(URL(), uri);
+                convertToInternalProtocol(url);
+                if (url != m_url) {
+                    GST_DEBUG_OBJECT(pipeline(), "Ignoring HTTP response headers for non-main URI.");
+                    break;
+                }
+            }
+            GUniqueOutPtr<GstStructure> responseHeaders;
+            if (gst_structure_get(structure, "response-headers", GST_TYPE_STRUCTURE, &responseHeaders.outPtr(), nullptr)) {
+                const char* contentLengthHeaderName = httpHeaderNameString(HTTPHeaderName::ContentLength).utf8().data();
+                uint64_t contentLength = 0;
+                if (!gst_structure_get_uint64(responseHeaders.get(), contentLengthHeaderName, &contentLength)) {
+                    // souphttpsrc sets a string for Content-Length, so
+                    // handle it here, until we remove the webkit+ protocol
+                    // prefix from webkitwebsrc.
+                    if (const char* contentLengthAsString = gst_structure_get_string(responseHeaders.get(), contentLengthHeaderName)) {
+                        contentLength = g_ascii_strtoull(contentLengthAsString, nullptr, 10);
+                        if (contentLength == G_MAXUINT64)
+                            contentLength = 0;
+                    }
+                }
+                GST_INFO_OBJECT(pipeline(), "%s stream detected", !contentLength ? "Live" : "Non-live");
+                if (!contentLength) {
                     m_isStreaming = true;
                     setDownloadBuffering();
                 }
-                gst_structure_free(responseHeaders);
             }
         } else if (gst_structure_has_name(structure, "adaptive-streaming-statistics")) {
             if (WEBKIT_IS_WEB_SRC(m_source.get()))
@@ -1647,9 +1670,13 @@ void MediaPlayerPrivateGStreamer::fillTimerFired()
 
 MediaTime MediaPlayerPrivateGStreamer::maxMediaTimeSeekable() const
 {
+    GST_TRACE_OBJECT(pipeline(), "errorOccured: %s, isLiveStream: %s", boolForPrinting(m_errorOccured), boolForPrinting(isLiveStream()));
     if (m_errorOccured)
         return MediaTime::zeroTime();
 
+    if (isLiveStream())
+        return MediaTime::zeroTime();
+
     MediaTime duration = durationMediaTime();
     GST_DEBUG_OBJECT(pipeline(), "maxMediaTimeSeekable, duration: %s", toString(duration).utf8().data());
     // infinite duration means live stream
index 52aa836..574a2a5 100644 (file)
@@ -1,6 +1,7 @@
 /*
  *  Copyright (C) 2009, 2010 Sebastian Dröge <sebastian.droege@collabora.co.uk>
  *  Copyright (C) 2013 Collabora Ltd.
+ *  Copyright (C) 2019 Igalia S.L.
  *
  *  This library is free software; you can redistribute it and/or
  *  modify it under the terms of the GNU Lesser General Public
@@ -32,8 +33,7 @@
 #include "ResourceResponse.h"
 #include "SecurityOrigin.h"
 #include <cstdint>
-#include <gst/app/gstappsrc.h>
-#include <gst/pbutils/missing-plugins.h>
+#include <wtf/Condition.h>
 #include <wtf/text/CString.h>
 
 using namespace WebCore;
@@ -73,41 +73,43 @@ private:
 enum MainThreadSourceNotification {
     Start = 1 << 0,
     Stop = 1 << 1,
-    NeedData = 1 << 2,
-    EnoughData = 1 << 3,
-    Seek = 1 << 4
 };
 
 #define WEBKIT_WEB_SRC_GET_PRIVATE(obj) (G_TYPE_INSTANCE_GET_PRIVATE((obj), WEBKIT_TYPE_WEB_SRC, WebKitWebSrcPrivate))
 struct _WebKitWebSrcPrivate {
-    GstAppSrc* appsrc;
-    GstPad* srcpad;
     CString originalURI;
     CString redirectedURI;
     bool keepAlive;
     GUniquePtr<GstStructure> extraHeaders;
     bool compress;
     GUniquePtr<gchar> httpMethod;
-
     WebCore::MediaPlayer* player;
-
     RefPtr<PlatformMediaResourceLoader> loader;
     RefPtr<PlatformMediaResource> resource;
-
+    RefPtr<MainThreadNotifier<MainThreadSourceNotification>> notifier;
     bool didPassAccessControlCheck;
-
-    guint64 offset;
+    bool wereHeadersReceived;
+    Condition headersCondition;
+    Lock responseLock;
+    bool wasResponseReceived;
+    Condition responseCondition;
+    bool doesHaveEOS;
+    bool isFlushing { false };
+    uint64_t readPosition;
+    uint64_t requestedPosition;
+    uint64_t stopPosition;
+    bool isDurationSet;
     bool haveSize;
-    guint64 size;
-    gboolean seekable;
-    bool paused;
+    uint64_t size;
+    bool isSeekable;
     bool isSeeking;
-
-    guint64 requestedOffset;
-
+    bool wasSeeking { false };
     uint64_t minimumBlocksize;
-
-    RefPtr<MainThreadNotifier<MainThreadSourceNotification>> notifier;
+    Lock adapterLock;
+    Condition adapterCondition;
+    uint64_t queueSize { 0 };
+    GRefPtr<GstAdapter> adapter;
+    GRefPtr<GstEvent> httpHeadersEvent;
 };
 
 enum {
@@ -120,10 +122,7 @@ enum {
     PROP_METHOD
 };
 
-static GstStaticPadTemplate srcTemplate = GST_STATIC_PAD_TEMPLATE("src",
-                                                                  GST_PAD_SRC,
-                                                                  GST_PAD_ALWAYS,
-                                                                  GST_STATIC_CAPS_ANY);
+static GstStaticPadTemplate srcTemplate = GST_STATIC_PAD_TEMPLATE("src", GST_PAD_SRC, GST_PAD_ALWAYS, GST_STATIC_CAPS_ANY);
 
 GST_DEBUG_CATEGORY_STATIC(webkit_web_src_debug);
 #define GST_CAT_DEFAULT webkit_web_src_debug
@@ -135,50 +134,37 @@ static void webKitWebSrcFinalize(GObject*);
 static void webKitWebSrcSetProperty(GObject*, guint propertyID, const GValue*, GParamSpec*);
 static void webKitWebSrcGetProperty(GObject*, guint propertyID, GValue*, GParamSpec*);
 static GstStateChangeReturn webKitWebSrcChangeState(GstElement*, GstStateChange);
-
-static gboolean webKitWebSrcQueryWithParent(GstPad*, GstObject*, GstQuery*);
-
-static void webKitWebSrcNeedData(WebKitWebSrc*);
-static void webKitWebSrcEnoughData(WebKitWebSrc*);
-static gboolean webKitWebSrcSeek(WebKitWebSrc*, guint64);
-
-static GstAppSrcCallbacks appsrcCallbacks = {
-    // need_data
-    [](GstAppSrc*, guint, gpointer userData) {
-        webKitWebSrcNeedData(WEBKIT_WEB_SRC(userData));
-    },
-    // enough_data
-    [](GstAppSrc*, gpointer userData) {
-        webKitWebSrcEnoughData(WEBKIT_WEB_SRC(userData));
-    },
-    // seek_data
-    [](GstAppSrc*, guint64 offset, gpointer userData) -> gboolean {
-        return webKitWebSrcSeek(WEBKIT_WEB_SRC(userData), offset);
-    },
-    { nullptr }
-};
+static GstFlowReturn webKitWebSrcCreate(GstPushSrc*, GstBuffer**);
+static gboolean webKitWebSrcStart(GstBaseSrc*);
+static gboolean webKitWebSrcStop(GstBaseSrc*);
+static gboolean webKitWebSrcGetSize(GstBaseSrc*, guint64* size);
+static gboolean webKitWebSrcIsSeekable(GstBaseSrc*);
+static gboolean webKitWebSrcDoSeek(GstBaseSrc*, GstSegment*);
+static gboolean webKitWebSrcQuery(GstBaseSrc*, GstQuery*);
+static gboolean webKitWebSrcUnLock(GstBaseSrc*);
+static gboolean webKitWebSrcUnLockStop(GstBaseSrc*);
 
 #define webkit_web_src_parent_class parent_class
 // We split this out into another macro to avoid a check-webkit-style error.
 #define WEBKIT_WEB_SRC_CATEGORY_INIT GST_DEBUG_CATEGORY_INIT(webkit_web_src_debug, "webkitwebsrc", 0, "websrc element");
-G_DEFINE_TYPE_WITH_CODE(WebKitWebSrc, webkit_web_src, GST_TYPE_BIN,
-                         G_IMPLEMENT_INTERFACE(GST_TYPE_URI_HANDLER, webKitWebSrcUriHandlerInit);
-                         WEBKIT_WEB_SRC_CATEGORY_INIT);
+G_DEFINE_TYPE_WITH_CODE(WebKitWebSrc, webkit_web_src, GST_TYPE_PUSH_SRC,
+    G_IMPLEMENT_INTERFACE(GST_TYPE_URI_HANDLER, webKitWebSrcUriHandlerInit);
+    WEBKIT_WEB_SRC_CATEGORY_INIT);
 
 static void webkit_web_src_class_init(WebKitWebSrcClass* klass)
 {
     GObjectClass* oklass = G_OBJECT_CLASS(klass);
-    GstElementClass* eklass = GST_ELEMENT_CLASS(klass);
 
     oklass->dispose = webKitWebSrcDispose;
     oklass->finalize = webKitWebSrcFinalize;
     oklass->set_property = webKitWebSrcSetProperty;
     oklass->get_property = webKitWebSrcGetProperty;
 
-    gst_element_class_add_pad_template(eklass,
-                                       gst_static_pad_template_get(&srcTemplate));
+    GstElementClass* eklass = GST_ELEMENT_CLASS(klass);
+    gst_element_class_add_static_pad_template(eklass, &srcTemplate);
+
     gst_element_class_set_metadata(eklass, "WebKit Web source element", "Source", "Handles HTTP/HTTPS uris",
-                               "Sebastian Dröge <sebastian.droege@collabora.co.uk>");
+        "Philippe Normand <philn@igalia.com>");
 
     /* Allows setting the uri using the 'location' property, which is used
      * for example by gst_element_make_from_uri() */
@@ -206,11 +192,38 @@ static void webkit_web_src_class_init(WebKitWebSrcClass* klass)
         g_param_spec_string("method", "method", "The HTTP method to use (default: GET)",
             nullptr, static_cast<GParamFlags>(G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)));
 
-    eklass->change_state = webKitWebSrcChangeState;
+    eklass->change_state = GST_DEBUG_FUNCPTR(webKitWebSrcChangeState);
+
+    GstBaseSrcClass* baseSrcClass = GST_BASE_SRC_CLASS(klass);
+    baseSrcClass->start = GST_DEBUG_FUNCPTR(webKitWebSrcStart);
+    baseSrcClass->stop = GST_DEBUG_FUNCPTR(webKitWebSrcStop);
+    baseSrcClass->unlock = GST_DEBUG_FUNCPTR(webKitWebSrcUnLock);
+    baseSrcClass->unlock_stop = GST_DEBUG_FUNCPTR(webKitWebSrcUnLockStop);
+    baseSrcClass->get_size = GST_DEBUG_FUNCPTR(webKitWebSrcGetSize);
+    baseSrcClass->is_seekable = GST_DEBUG_FUNCPTR(webKitWebSrcIsSeekable);
+    baseSrcClass->do_seek = GST_DEBUG_FUNCPTR(webKitWebSrcDoSeek);
+    baseSrcClass->query = GST_DEBUG_FUNCPTR(webKitWebSrcQuery);
+
+    GstPushSrcClass* pushSrcClass = GST_PUSH_SRC_CLASS(klass);
+    pushSrcClass->create = GST_DEBUG_FUNCPTR(webKitWebSrcCreate);
 
     g_type_class_add_private(klass, sizeof(WebKitWebSrcPrivate));
 }
 
+
+static void webkitWebSrcReset(WebKitWebSrc* src)
+{
+    WebKitWebSrcPrivate* priv = WEBKIT_WEB_SRC_GET_PRIVATE(src);
+
+    priv->haveSize = false;
+    priv->wereHeadersReceived = false;
+    priv->isSeekable = false;
+    priv->readPosition = 0;
+    priv->requestedPosition = 0;
+    priv->stopPosition = -1;
+    priv->size = 0;
+}
+
 static void webkit_web_src_init(WebKitWebSrc* src)
 {
     WebKitWebSrcPrivate* priv = WEBKIT_WEB_SRC_GET_PRIVATE(src);
@@ -219,54 +232,11 @@ static void webkit_web_src_init(WebKitWebSrc* src)
     new (priv) WebKitWebSrcPrivate();
 
     priv->notifier = MainThreadNotifier<MainThreadSourceNotification>::create();
+    priv->adapter = adoptGRef(gst_adapter_new());
+    priv->minimumBlocksize = gst_base_src_get_blocksize(GST_BASE_SRC_CAST(src));
 
-    priv->haveSize = FALSE;
-    priv->size = 0;
-
-    priv->appsrc = GST_APP_SRC(gst_element_factory_make("appsrc", nullptr));
-    if (!priv->appsrc) {
-        GST_ERROR_OBJECT(src, "Failed to create appsrc");
-        return;
-    }
-
-    gst_bin_add(GST_BIN(src), GST_ELEMENT(priv->appsrc));
-
-    GRefPtr<GstPad> targetPad = adoptGRef(gst_element_get_static_pad(GST_ELEMENT(priv->appsrc), "src"));
-    priv->srcpad = webkitGstGhostPadFromStaticTemplate(&srcTemplate, "src", targetPad.get());
-
-    gst_element_add_pad(GST_ELEMENT(src), priv->srcpad);
-
-    GST_OBJECT_FLAG_SET(priv->srcpad, GST_PAD_FLAG_NEED_PARENT);
-    gst_pad_set_query_function(priv->srcpad, webKitWebSrcQueryWithParent);
-
-    gst_app_src_set_callbacks(priv->appsrc, &appsrcCallbacks, src, nullptr);
-    gst_app_src_set_emit_signals(priv->appsrc, FALSE);
-    gst_app_src_set_stream_type(priv->appsrc, GST_APP_STREAM_TYPE_SEEKABLE);
-
-    // 512k is a abitrary number but we should choose a value
-    // here to not pause/unpause the SoupMessage too often and
-    // to make sure there's always some data available for
-    // GStreamer to handle.
-    gst_app_src_set_max_bytes(priv->appsrc, 512 * 1024);
-
-    // Emit the need-data signal if the queue contains less
-    // than 20% of data. Without this the need-data signal
-    // is emitted when the queue is empty, we then dispatch
-    // the soup message unpausing to the main loop and from
-    // there unpause the soup message. This already takes
-    // quite some time and libsoup even needs some more time
-    // to actually provide data again. If we do all this
-    // already if the queue is 20% empty, it's much more
-    // likely that libsoup already provides new data before
-    // the queue is really empty.
-    // This might need tweaking for ports not using libsoup.
-    g_object_set(priv->appsrc, "min-percent", 20, nullptr);
-
-    gst_base_src_set_automatic_eos(GST_BASE_SRC(priv->appsrc), FALSE);
-
-    gst_app_src_set_caps(priv->appsrc, nullptr);
-
-    priv->minimumBlocksize = gst_base_src_get_blocksize(GST_BASE_SRC_CAST(priv->appsrc));
+    webkitWebSrcReset(src);
+    gst_base_src_set_automatic_eos(GST_BASE_SRC_CAST(src), FALSE);
 }
 
 static void webKitWebSrcDispose(GObject* object)
@@ -347,46 +317,104 @@ static void webKitWebSrcGetProperty(GObject* object, guint propID, GValue* value
     }
 }
 
-static void webKitWebSrcStop(WebKitWebSrc* src)
+static GstFlowReturn webKitWebSrcCreate(GstPushSrc* pushSrc, GstBuffer** buffer)
 {
+    GstBaseSrc* baseSrc = GST_BASE_SRC_CAST(pushSrc);
+    WebKitWebSrc* src = WEBKIT_WEB_SRC(baseSrc);
     WebKitWebSrcPrivate* priv = src->priv;
 
-    if (priv->resource || (priv->loader && !priv->keepAlive)) {
-        GRefPtr<WebKitWebSrc> protector = WTF::ensureGRef(src);
-        priv->notifier->cancelPendingNotifications(MainThreadSourceNotification::NeedData | MainThreadSourceNotification::EnoughData | MainThreadSourceNotification::Seek);
-        priv->notifier->notify(MainThreadSourceNotification::Stop, [protector, keepAlive = priv->keepAlive] {
-            WebKitWebSrcPrivate* priv = protector->priv;
+    GST_TRACE_OBJECT(src, "readPosition = %" G_GUINT64_FORMAT " requestedPosition = %" G_GUINT64_FORMAT, priv->readPosition, priv->requestedPosition);
 
-            if (priv->resource) {
-                priv->resource->stop();
-                priv->resource->setClient(nullptr);
-                priv->resource = nullptr;
-            }
+    if (priv->requestedPosition != priv->readPosition) {
+        {
+            LockHolder adapterLocker(priv->adapterLock);
+            GST_DEBUG_OBJECT(src, "Seeking, flushing adapter");
+            // Discard all the buffers coming before the requested seek position.
+            gst_adapter_flush(priv->adapter.get(), priv->queueSize);
+            priv->queueSize = 0;
+        }
+        uint64_t requestedPosition = priv->requestedPosition;
+        webKitWebSrcStop(baseSrc);
+        priv->requestedPosition = requestedPosition;
+        webKitWebSrcStart(baseSrc);
+    }
 
-            if (!keepAlive)
-                priv->loader = nullptr;
+    {
+        LockHolder locker(priv->responseLock);
+        priv->responseCondition.wait(priv->responseLock, [priv] () {
+            return priv->wasResponseReceived || priv->isFlushing;
         });
     }
 
-    bool wasSeeking = std::exchange(priv->isSeeking, false);
+    GST_TRACE_OBJECT(src, "flushing: %s, doesHaveEOS: %s, queueSize: %" G_GSIZE_FORMAT, boolForPrinting(priv->isFlushing), boolForPrinting(priv->doesHaveEOS), priv->queueSize);
 
-    priv->paused = false;
+    if (priv->isFlushing) {
+        GST_DEBUG_OBJECT(src, "Flushing");
+        return GST_FLOW_FLUSHING;
+    }
 
-    priv->offset = 0;
+    if (priv->doesHaveEOS) {
+        GST_DEBUG_OBJECT(src, "EOS");
+        return GST_FLOW_EOS;
+    }
 
-    if (!wasSeeking) {
-        priv->requestedOffset = 0;
-        priv->player = nullptr;
-        priv->seekable = FALSE;
+    unsigned size = gst_base_src_get_blocksize(baseSrc);
+    bool isAdapterDrained = false;
+    {
+        LockHolder adapterLocker(priv->adapterLock);
+        unsigned retries = 0;
+        size_t available = gst_adapter_available_fast(priv->adapter.get());
+        while (available < size && !isAdapterDrained) {
+            priv->adapterCondition.waitFor(priv->adapterLock, Seconds(1));
+            retries++;
+            available = gst_adapter_available_fast(priv->adapter.get());
+            if (available && available < size)
+                size = available;
+            else if (retries > 3)
+                isAdapterDrained = true;
+        }
     }
 
-    if (priv->appsrc) {
-        gst_app_src_set_caps(priv->appsrc, nullptr);
-        if (!wasSeeking)
-            gst_app_src_set_size(priv->appsrc, -1);
+    if (isAdapterDrained) {
+        GST_DEBUG_OBJECT(src, "Adapter still empty after 3 seconds of waiting, assuming EOS");
+        return GST_FLOW_EOS;
     }
 
-    GST_DEBUG_OBJECT(src, "Stopped request");
+    if (priv->haveSize && !priv->isDurationSet) {
+        GST_DEBUG_OBJECT(src, "Setting duration to %" G_GUINT64_FORMAT, priv->size);
+        baseSrc->segment.duration = priv->size;
+        priv->isDurationSet = true;
+        gst_element_post_message(GST_ELEMENT_CAST(src), gst_message_new_duration_changed(GST_OBJECT_CAST(src)));
+    }
+
+    if (priv->httpHeadersEvent)
+        gst_pad_push_event(GST_BASE_SRC_PAD(baseSrc), priv->httpHeadersEvent.leakRef());
+
+    {
+        GST_TRACE_OBJECT(src, "Taking %u bytes from adapter", size);
+        LockHolder adapterLocker(priv->adapterLock);
+        if (size) {
+            *buffer = gst_adapter_take_buffer_fast(priv->adapter.get(), size);
+            RELEASE_ASSERT(*buffer);
+
+            priv->queueSize -= size;
+
+            GST_BUFFER_OFFSET(*buffer) = baseSrc->segment.position;
+            GST_BUFFER_OFFSET_END(*buffer) = GST_BUFFER_OFFSET(*buffer) + size;
+            GST_TRACE_OBJECT(src, "Buffer bounds set to %" G_GUINT64_FORMAT "-%" G_GUINT64_FORMAT, GST_BUFFER_OFFSET(*buffer), GST_BUFFER_OFFSET_END(*buffer));
+            GST_TRACE_OBJECT(src, "doesHaveEOS: %s, wasSeeking: %s, seeking: %s, size: %u", boolForPrinting(priv->doesHaveEOS), boolForPrinting(priv->wasSeeking), boolForPrinting(priv->isSeeking), size);
+            if (priv->haveSize && GST_BUFFER_OFFSET_END(*buffer) >= priv->size) {
+                if (priv->wasSeeking)
+                    priv->wasSeeking = false;
+                else
+                    priv->doesHaveEOS = true;
+            } else if (priv->wasSeeking)
+                priv->wasSeeking = false;
+        } else
+            GST_ERROR_OBJECT(src, "Empty adapter?");
+    }
+
+    return GST_FLOW_OK;
 }
 
 static bool webKitWebSrcSetExtraHeader(GQuark fieldId, const GValue* value, gpointer userData)
@@ -440,17 +468,30 @@ static gboolean webKitWebSrcProcessExtraHeaders(GQuark fieldId, const GValue* va
     return webKitWebSrcSetExtraHeader(fieldId, value, userData);
 }
 
-static void webKitWebSrcStart(WebKitWebSrc* src)
+static gboolean webKitWebSrcStart(GstBaseSrc* baseSrc)
 {
+    WebKitWebSrc* src = WEBKIT_WEB_SRC(baseSrc);
     WebKitWebSrcPrivate* priv = src->priv;
     ASSERT(priv->player);
 
+    priv->wereHeadersReceived = false;
+    priv->wasResponseReceived = false;
+    priv->isDurationSet = false;
+    priv->doesHaveEOS = false;
+    priv->isFlushing = false;
+
     priv->didPassAccessControlCheck = false;
 
     if (priv->originalURI.isNull()) {
         GST_ERROR_OBJECT(src, "No URI provided");
-        webKitWebSrcStop(src);
-        return;
+        webKitWebSrcStop(baseSrc);
+        return FALSE;
+    }
+
+    if (priv->requestedPosition == priv->stopPosition) {
+        GST_DEBUG_OBJECT(src, "Empty segment, signaling EOS");
+        priv->doesHaveEOS = true;
+        return FALSE;
     }
 
     GST_DEBUG_OBJECT(src, "Fetching %s", priv->originalURI.data());
@@ -484,16 +525,16 @@ static void webKitWebSrcStart(WebKitWebSrc* src)
         || !g_ascii_strcasecmp("trailers.apple.com", url.host().utf8().data()))
         request.setHTTPUserAgent("Quicktime/7.6.6");
 
-    if (priv->requestedOffset) {
-        GUniquePtr<gchar> val(g_strdup_printf("bytes=%" G_GUINT64_FORMAT "-", priv->requestedOffset));
-        request.setHTTPHeaderField(HTTPHeaderName::Range, val.get());
+    if (priv->requestedPosition) {
+        GUniquePtr<char> formatedRange(g_strdup_printf("bytes=%" G_GUINT64_FORMAT "-", priv->requestedPosition));
+        GST_DEBUG_OBJECT(src, "Range request: %s", formatedRange.get());
+        request.setHTTPHeaderField(HTTPHeaderName::Range, formatedRange.get());
     }
-    priv->offset = priv->requestedOffset;
+    priv->readPosition = priv->requestedPosition;
 
     GST_DEBUG_OBJECT(src, "Persistent connection support %s", priv->keepAlive ? "enabled" : "disabled");
-    if (!priv->keepAlive) {
+    if (!priv->keepAlive)
         request.setHTTPHeaderField(HTTPHeaderName::Connection, "close");
-    }
 
     if (priv->extraHeaders)
         gst_structure_foreach(priv->extraHeaders.get(), webKitWebSrcProcessExtraHeaders, &request);
@@ -502,9 +543,8 @@ static void webKitWebSrcStart(WebKitWebSrc* src)
     request.setHTTPHeaderField(HTTPHeaderName::IcyMetadata, "1");
 
     GRefPtr<WebKitWebSrc> protector = WTF::ensureGRef(src);
-    priv->notifier->notify(MainThreadSourceNotification::Start, [protector, request = WTFMove(request)] {
+    priv->notifier->notifyAndWait(MainThreadSourceNotification::Start, [protector, request = WTFMove(request)] {
         WebKitWebSrcPrivate* priv = protector->priv;
-
         if (!priv->loader)
             priv->loader = priv->player->createResourceLoader();
 
@@ -518,90 +558,172 @@ static void webKitWebSrcStart(WebKitWebSrc* src)
         } else {
             GST_ERROR_OBJECT(protector.get(), "Failed to setup streaming client");
             priv->loader = nullptr;
-            webKitWebSrcStop(protector.get());
         }
     });
+
+    GST_DEBUG_OBJECT(src, "Resource loader started");
+    return TRUE;
 }
 
-static GstStateChangeReturn webKitWebSrcChangeState(GstElement* element, GstStateChange transition)
+static void webKitWebSrcCloseSession(WebKitWebSrc* src)
 {
-    GstStateChangeReturn ret = GST_STATE_CHANGE_SUCCESS;
-    WebKitWebSrc* src = WEBKIT_WEB_SRC(element);
     WebKitWebSrcPrivate* priv = src->priv;
+    GRefPtr<WebKitWebSrc> protector = WTF::ensureGRef(src);
 
-    switch (transition) {
-    case GST_STATE_CHANGE_NULL_TO_READY:
-        if (!priv->appsrc) {
-            gst_element_post_message(element,
-                                     gst_missing_element_message_new(element, "appsrc"));
-            GST_ELEMENT_ERROR(src, CORE, MISSING_PLUGIN, (nullptr), ("no appsrc"));
-            return GST_STATE_CHANGE_FAILURE;
+    priv->notifier->notifyAndWait(MainThreadSourceNotification::Stop, [protector, keepAlive = priv->keepAlive] {
+        WebKitWebSrcPrivate* priv = protector->priv;
+
+        GST_DEBUG_OBJECT(protector.get(), "Stopping resource loader");
+
+        if (priv->resource) {
+            priv->resource->stop();
+            priv->resource->setClient(nullptr);
+            priv->resource = nullptr;
         }
-        break;
-    default:
-        break;
+
+        if (!keepAlive)
+            priv->loader = nullptr;
+    });
+
+    GST_DEBUG_OBJECT(src, "Resource loader stopped");
+}
+
+static gboolean webKitWebSrcStop(GstBaseSrc* baseSrc)
+{
+    WebKitWebSrc* src = WEBKIT_WEB_SRC(baseSrc);
+    WebKitWebSrcPrivate* priv = src->priv;
+
+    if (priv->resource || (priv->loader && !priv->keepAlive))
+        webKitWebSrcCloseSession(src);
+
+    {
+        LockHolder adapterLocker(priv->adapterLock);
+        gst_adapter_clear(priv->adapter.get());
+        priv->queueSize = 0;
     }
 
-    ret = GST_ELEMENT_CLASS(parent_class)->change_state(element, transition);
-    if (G_UNLIKELY(ret == GST_STATE_CHANGE_FAILURE)) {
-        GST_DEBUG_OBJECT(src, "State change failed");
-        return ret;
+    webkitWebSrcReset(src);
+    GST_DEBUG_OBJECT(src, "Stopped request");
+    return TRUE;
+}
+
+static gboolean webKitWebSrcGetSize(GstBaseSrc* baseSrc, guint64* size)
+{
+    WebKitWebSrc* src = WEBKIT_WEB_SRC(baseSrc);
+    WebKitWebSrcPrivate* priv = src->priv;
+
+    GST_DEBUG_OBJECT(src, "haveSize: %s, size: %" G_GUINT64_FORMAT, boolForPrinting(priv->haveSize), priv->size);
+    if (priv->haveSize) {
+        *size = priv->size;
+        return TRUE;
     }
 
-    switch (transition) {
-    case GST_STATE_CHANGE_READY_TO_PAUSED:
-    {
-        GST_DEBUG_OBJECT(src, "READY->PAUSED");
-        webKitWebSrcStart(src);
-        break;
+    return FALSE;
+}
+
+static gboolean webKitWebSrcIsSeekable(GstBaseSrc* baseSrc)
+{
+    WebKitWebSrc* src = WEBKIT_WEB_SRC(baseSrc);
+
+    GST_DEBUG_OBJECT(src, "isSeekable: %s", boolForPrinting(src->priv->isSeekable));
+    return src->priv->isSeekable;
+}
+
+static gboolean webKitWebSrcDoSeek(GstBaseSrc* baseSrc, GstSegment* segment)
+{
+    WebKitWebSrc* src = WEBKIT_WEB_SRC(baseSrc);
+    WebKitWebSrcPrivate* priv = src->priv;
+    LockHolder locker(priv->responseLock);
+
+    GST_DEBUG_OBJECT(src, "Seek segment: (%" G_GUINT64_FORMAT "-%" G_GUINT64_FORMAT ")", segment->start, segment->stop);
+    if (priv->readPosition == segment->start && priv->requestedPosition == priv->readPosition && priv->stopPosition == segment->stop) {
+        GST_DEBUG_OBJECT(src, "Seek to current read/end position and no seek pending");
+        return TRUE;
     }
-    case GST_STATE_CHANGE_PAUSED_TO_READY:
-    {
-        GST_DEBUG_OBJECT(src, "PAUSED->READY");
-        webKitWebSrcStop(src);
-        break;
+
+    if (priv->wereHeadersReceived && !priv->isSeekable) {
+        GST_WARNING_OBJECT(src, "Not seekable");
+        return FALSE;
     }
-    default:
-        break;
+
+    if (segment->rate < 0.0 || segment->format != GST_FORMAT_BYTES) {
+        GST_WARNING_OBJECT(src, "Invalid seek segment");
+        return FALSE;
     }
 
-    return ret;
+    if (priv->haveSize && segment->start >= priv->size)
+        GST_WARNING_OBJECT(src, "Potentially seeking behind end of file, might EOS immediately");
+
+    priv->isSeeking = true;
+    priv->requestedPosition = segment->start;
+    priv->stopPosition = segment->stop;
+    return TRUE;
 }
 
-static gboolean webKitWebSrcQueryWithParent(GstPad* pad, GstObject* parent, GstQuery* query)
+static gboolean webKitWebSrcQuery(GstBaseSrc* baseSrc, GstQuery* query)
 {
-    WebKitWebSrc* src = WEBKIT_WEB_SRC(GST_ELEMENT(parent));
+    WebKitWebSrc* src = WEBKIT_WEB_SRC(baseSrc);
     WebKitWebSrcPrivate* priv = src->priv;
     gboolean result = FALSE;
 
-    switch (GST_QUERY_TYPE(query)) {
-    case GST_QUERY_URI: {
+    if (GST_QUERY_TYPE(query) == GST_QUERY_URI) {
         gst_query_set_uri(query, priv->originalURI.data());
         if (!priv->redirectedURI.isNull())
             gst_query_set_uri_redirection(query, priv->redirectedURI.data());
         result = TRUE;
-        break;
     }
-    case GST_QUERY_SCHEDULING: {
+
+    if (!result)
+        result = GST_BASE_SRC_CLASS(parent_class)->query(baseSrc, query);
+
+    if (GST_QUERY_TYPE(query) == GST_QUERY_SCHEDULING) {
         GstSchedulingFlags flags;
         int minSize, maxSize, align;
 
         gst_query_parse_scheduling(query, &flags, &minSize, &maxSize, &align);
         gst_query_set_scheduling(query, static_cast<GstSchedulingFlags>(flags | GST_SCHEDULING_FLAG_BANDWIDTH_LIMITED), minSize, maxSize, align);
-        result = TRUE;
-        break;
     }
-    default: {
-        GRefPtr<GstPad> target = adoptGRef(gst_ghost_pad_get_target(GST_GHOST_PAD_CAST(pad)));
 
-        // Forward the query to the proxy target pad.
-        if (target)
-            result = gst_pad_query(target.get(), query);
+    return result;
+}
+
+static gboolean webKitWebSrcUnLock(GstBaseSrc* baseSrc)
+{
+    WebKitWebSrc* src = WEBKIT_WEB_SRC(baseSrc);
+    LockHolder locker(src->priv->responseLock);
+
+    GST_DEBUG_OBJECT(src, "Unlock");
+    src->priv->isFlushing = true;
+    src->priv->responseCondition.notifyOne();
+    return TRUE;
+}
+
+static gboolean webKitWebSrcUnLockStop(GstBaseSrc* baseSrc)
+{
+    WebKitWebSrc* src = WEBKIT_WEB_SRC(baseSrc);
+    LockHolder locker(src->priv->responseLock);
+    GST_DEBUG_OBJECT(src, "Unlock stop");
+    src->priv->isFlushing = false;
+
+    return TRUE;
+}
+
+static GstStateChangeReturn webKitWebSrcChangeState(GstElement* element, GstStateChange transition)
+{
+    WebKitWebSrc* src = WEBKIT_WEB_SRC(element);
+
+#if GST_CHECK_VERSION(1, 14, 0)
+    GST_DEBUG_OBJECT(src, gst_state_change_get_name(transition));
+#endif
+    switch (transition) {
+    case GST_STATE_CHANGE_READY_TO_NULL:
+        webKitWebSrcCloseSession(src);
+        break;
+    default:
         break;
-    }
     }
 
-    return result;
+    return GST_ELEMENT_CLASS(parent_class)->change_state(element, transition);
 }
 
 static bool urlHasSupportedProtocol(const URL& url)
@@ -672,61 +794,6 @@ static void webKitWebSrcUriHandlerInit(gpointer gIface, gpointer)
     iface->set_uri = webKitWebSrcSetUri;
 }
 
-static void webKitWebSrcNeedData(WebKitWebSrc* src)
-{
-    WebKitWebSrcPrivate* priv = src->priv;
-
-    GST_LOG_OBJECT(src, "Need more data");
-
-    if (!priv->paused)
-        return;
-    priv->paused = false;
-
-    GRefPtr<WebKitWebSrc> protector = WTF::ensureGRef(src);
-    priv->notifier->notify(MainThreadSourceNotification::NeedData, [protector] { });
-}
-
-static void webKitWebSrcEnoughData(WebKitWebSrc* src)
-{
-    WebKitWebSrcPrivate* priv = src->priv;
-
-    GST_DEBUG_OBJECT(src, "Have enough data");
-
-    if (priv->paused)
-        return;
-    priv->paused = true;
-
-    GRefPtr<WebKitWebSrc> protector = WTF::ensureGRef(src);
-    priv->notifier->notify(MainThreadSourceNotification::EnoughData, [protector] {
-        WebKitWebSrcPrivate* priv = protector->priv;
-        if (priv->resource)
-            priv->resource->stop();
-    });
-}
-
-static gboolean webKitWebSrcSeek(WebKitWebSrc* src, guint64 offset)
-{
-    WebKitWebSrcPrivate* priv = src->priv;
-
-    if (offset == priv->offset && priv->requestedOffset == priv->offset)
-        return TRUE;
-
-    if (!priv->seekable)
-        return FALSE;
-
-    priv->isSeeking = true;
-    priv->requestedOffset = offset;
-
-    GST_DEBUG_OBJECT(src, "Seeking to offset: %" G_GUINT64_FORMAT, src->priv->requestedOffset);
-
-    GRefPtr<WebKitWebSrc> protector = WTF::ensureGRef(src);
-    priv->notifier->notify(MainThreadSourceNotification::Seek, [protector] {
-        webKitWebSrcStop(protector.get());
-        webKitWebSrcStart(protector.get());
-    });
-    return TRUE;
-}
-
 void webKitWebSrcSetMediaPlayer(WebKitWebSrc* src, WebCore::MediaPlayer* player)
 {
     ASSERT(player);
@@ -749,10 +816,11 @@ CachedResourceStreamingClient::~CachedResourceStreamingClient() = default;
 void CachedResourceStreamingClient::checkUpdateBlocksize(uint64_t bytesRead)
 {
     WebKitWebSrc* src = WEBKIT_WEB_SRC(m_src.get());
+    GstBaseSrc* baseSrc = GST_BASE_SRC_CAST(src);
     WebKitWebSrcPrivate* priv = src->priv;
 
-    uint64_t blocksize = gst_base_src_get_blocksize(GST_BASE_SRC_CAST(priv->appsrc));
-    GST_LOG_OBJECT(src, "Checking to update blocksize. Read:%" PRIu64 " blocksize:%" PRIu64, bytesRead, blocksize);
+    uint64_t blocksize = gst_base_src_get_blocksize(baseSrc);
+    GST_LOG_OBJECT(src, "Checking to update blocksize. Read: %" PRIu64 ", current blocksize: %" PRIu64, bytesRead, blocksize);
 
     if (bytesRead >= blocksize * s_growBlocksizeLimit) {
         m_reduceBlocksizeCount = 0;
@@ -761,7 +829,7 @@ void CachedResourceStreamingClient::checkUpdateBlocksize(uint64_t bytesRead)
         if (m_increaseBlocksizeCount >= s_growBlocksizeCount) {
             blocksize *= s_growBlocksizeFactor;
             GST_DEBUG_OBJECT(src, "Increased blocksize to %" PRIu64, blocksize);
-            gst_base_src_set_blocksize(GST_BASE_SRC_CAST(priv->appsrc), blocksize);
+            gst_base_src_set_blocksize(baseSrc, blocksize);
             m_increaseBlocksizeCount = 0;
         }
     } else if (bytesRead < blocksize * s_reduceBlocksizeLimit) {
@@ -772,7 +840,7 @@ void CachedResourceStreamingClient::checkUpdateBlocksize(uint64_t bytesRead)
             blocksize *= s_reduceBlocksizeFactor;
             blocksize = std::max(blocksize, priv->minimumBlocksize);
             GST_DEBUG_OBJECT(src, "Decreased blocksize to %" PRIu64, blocksize);
-            gst_base_src_set_blocksize(GST_BASE_SRC_CAST(priv->appsrc), blocksize);
+            gst_base_src_set_blocksize(baseSrc, blocksize);
             m_reduceBlocksizeCount = 0;
         }
     } else {
@@ -798,48 +866,44 @@ void CachedResourceStreamingClient::responseReceived(PlatformMediaResource&, con
 
     if (response.httpStatusCode() >= 400) {
         GST_ELEMENT_ERROR(src, RESOURCE, READ, ("Received %d HTTP error code", response.httpStatusCode()), (nullptr));
-        gst_app_src_end_of_stream(priv->appsrc);
-        webKitWebSrcStop(src);
-        return completionHandler(ShouldContinue::No);
-    }
-
-    if (priv->isSeeking) {
-        GST_DEBUG_OBJECT(src, "Seek in progress, ignoring response");
-        return completionHandler(ShouldContinue::Yes);
+        priv->doesHaveEOS = true;
+        webKitWebSrcStop(GST_BASE_SRC_CAST(src));
+        completionHandler(ShouldContinue::No);
+        return;
     }
 
-    if (priv->requestedOffset) {
+    if (priv->requestedPosition) {
         // Seeking ... we expect a 206 == PARTIAL_CONTENT
         if (response.httpStatusCode() == 200) {
             // Range request didn't have a ranged response; resetting offset.
-            priv->offset = 0;
+            priv->readPosition = 0;
         } else if (response.httpStatusCode() != 206) {
             // Range request completely failed.
             GST_ELEMENT_ERROR(src, RESOURCE, READ, ("Received unexpected %d HTTP status code", response.httpStatusCode()), (nullptr));
-            gst_app_src_end_of_stream(priv->appsrc);
-            webKitWebSrcStop(src);
-            return completionHandler(ShouldContinue::No);
+            priv->doesHaveEOS = true;
+            webKitWebSrcStop(GST_BASE_SRC_CAST(src));
+            completionHandler(ShouldContinue::No);
+            return;
+        } else {
+            GST_DEBUG_OBJECT(src, "Range request succeeded");
+            priv->isSeeking = false;
+            priv->wasSeeking = true;
         }
     }
 
     long long length = response.expectedContentLength();
-    if (length > 0 && priv->requestedOffset && response.httpStatusCode() == 206)
-        length += priv->requestedOffset;
+    if (length > 0 && priv->requestedPosition && response.httpStatusCode() == 206)
+        length += priv->requestedPosition;
 
-    priv->seekable = length > 0 && g_ascii_strcasecmp("none", response.httpHeaderField(HTTPHeaderName::AcceptRanges).utf8().data());
+    priv->isSeekable = length > 0 && g_ascii_strcasecmp("none", response.httpHeaderField(HTTPHeaderName::AcceptRanges).utf8().data());
 
-    GST_DEBUG_OBJECT(src, "Size: %lld, seekable: %s", length, priv->seekable ? "yes" : "no");
-    // notify size/duration
+    GST_DEBUG_OBJECT(src, "Size: %lld, isSeekable: %s", length, boolForPrinting(priv->isSeekable));
     if (length > 0) {
         if (!priv->haveSize || (static_cast<long long>(priv->size) != length)) {
-            priv->haveSize = TRUE;
+            priv->haveSize = true;
             priv->size = length;
-            gst_app_src_set_size(priv->appsrc, length);
+            priv->isDurationSet = false;
         }
-    } else {
-        gst_app_src_set_size(priv->appsrc, -1);
-        if (!priv->seekable)
-            gst_app_src_set_stream_type(priv->appsrc, GST_APP_STREAM_TYPE_STREAM);
     }
 
     // Signal to downstream if this is an Icecast stream.
@@ -854,118 +918,95 @@ void CachedResourceStreamingClient::responseReceived(PlatformMediaResource&, con
             String contentType = response.httpHeaderField(HTTPHeaderName::ContentType);
             GST_DEBUG_OBJECT(src, "Response ContentType: %s", contentType.utf8().data());
             gst_caps_set_simple(caps.get(), "content-type", G_TYPE_STRING, contentType.utf8().data(), nullptr);
-
-            gst_app_src_set_stream_type(priv->appsrc, GST_APP_STREAM_TYPE_STREAM);
         }
     }
 
-    gst_app_src_set_caps(priv->appsrc, caps.get());
-
-    // Emit a GST_EVENT_CUSTOM_DOWNSTREAM_STICKY event and message to let
-    // GStreamer know about the HTTP headers sent and received.
-    GstStructure* httpHeaders = gst_structure_new_empty("http-headers");
-    gst_structure_set(httpHeaders, "uri", G_TYPE_STRING, priv->originalURI.data(),
-        "http-status-code", G_TYPE_UINT, response.httpStatusCode(), nullptr);
-    if (!priv->redirectedURI.isNull())
-        gst_structure_set(httpHeaders, "redirection-uri", G_TYPE_STRING, priv->redirectedURI.data(), nullptr);
-    GUniquePtr<GstStructure> headers(gst_structure_new_empty("request-headers"));
-    for (const auto& header : m_request.httpHeaderFields())
-        gst_structure_set(headers.get(), header.key.utf8().data(), G_TYPE_STRING, header.value.utf8().data(), nullptr);
-    GST_DEBUG_OBJECT(src, "Request headers going downstream: %" GST_PTR_FORMAT, headers.get());
-    gst_structure_set(httpHeaders, "request-headers", GST_TYPE_STRUCTURE, headers.get(), nullptr);
-    headers.reset(gst_structure_new_empty("response-headers"));
-    for (const auto& header : response.httpHeaderFields())
-        gst_structure_set(headers.get(), header.key.utf8().data(), G_TYPE_STRING, header.value.utf8().data(), nullptr);
-    gst_structure_set(httpHeaders, "response-headers", GST_TYPE_STRUCTURE, headers.get(), nullptr);
-    GST_DEBUG_OBJECT(src, "Response headers going downstream: %" GST_PTR_FORMAT, headers.get());
-
-    gst_element_post_message(GST_ELEMENT_CAST(src), gst_message_new_element(GST_OBJECT_CAST(src),
-        gst_structure_copy(httpHeaders)));
-    gst_pad_push_event(GST_BASE_SRC_PAD(priv->appsrc), gst_event_new_custom(GST_EVENT_CUSTOM_DOWNSTREAM_STICKY, httpHeaders));
-    
+    if (caps) {
+        GST_DEBUG_OBJECT(src, "Set caps to %" GST_PTR_FORMAT, caps.get());
+        gst_base_src_set_caps(GST_BASE_SRC_CAST(src), caps.get());
+    }
+
+    {
+        LockHolder locker(priv->responseLock);
+
+        // Emit a GST_EVENT_CUSTOM_DOWNSTREAM_STICKY event and message to let
+        // GStreamer know about the HTTP headers sent and received.
+        GstStructure* httpHeaders = gst_structure_new_empty("http-headers");
+        gst_structure_set(httpHeaders, "uri", G_TYPE_STRING, priv->originalURI.data(),
+            "http-status-code", G_TYPE_UINT, response.httpStatusCode(), nullptr);
+        if (!priv->redirectedURI.isNull())
+            gst_structure_set(httpHeaders, "redirection-uri", G_TYPE_STRING, priv->redirectedURI.data(), nullptr);
+        GUniquePtr<GstStructure> headers(gst_structure_new_empty("request-headers"));
+        for (const auto& header : m_request.httpHeaderFields())
+            gst_structure_set(headers.get(), header.key.utf8().data(), G_TYPE_STRING, header.value.utf8().data(), nullptr);
+        GST_DEBUG_OBJECT(src, "Request headers going downstream: %" GST_PTR_FORMAT, headers.get());
+        gst_structure_set(httpHeaders, "request-headers", GST_TYPE_STRUCTURE, headers.get(), nullptr);
+        headers.reset(gst_structure_new_empty("response-headers"));
+        for (const auto& header : response.httpHeaderFields()) {
+            bool ok = false;
+            uint64_t convertedValue = header.value.toUInt64(&ok);
+            if (ok)
+                gst_structure_set(headers.get(), header.key.utf8().data(), G_TYPE_UINT64, convertedValue, nullptr);
+            else
+                gst_structure_set(headers.get(), header.key.utf8().data(), G_TYPE_STRING, header.value.utf8().data(), nullptr);
+        }
+        auto contentLengthFieldName(httpHeaderNameString(HTTPHeaderName::ContentLength).toString());
+        if (!gst_structure_has_field(headers.get(), contentLengthFieldName.utf8().data()))
+            gst_structure_set(headers.get(), contentLengthFieldName.utf8().data(), G_TYPE_UINT64, static_cast<uint64_t>(length), nullptr);
+        gst_structure_set(httpHeaders, "response-headers", GST_TYPE_STRUCTURE, headers.get(), nullptr);
+        GST_DEBUG_OBJECT(src, "Response headers going downstream: %" GST_PTR_FORMAT, headers.get());
+
+        gst_element_post_message(GST_ELEMENT_CAST(src), gst_message_new_element(GST_OBJECT_CAST(src), gst_structure_copy(httpHeaders)));
+
+        priv->httpHeadersEvent = adoptGRef(gst_event_new_custom(GST_EVENT_CUSTOM_DOWNSTREAM_STICKY, httpHeaders));
+        priv->wereHeadersReceived = true;
+        priv->headersCondition.notifyOne();
+    }
     completionHandler(ShouldContinue::Yes);
 }
 
 void CachedResourceStreamingClient::dataReceived(PlatformMediaResource&, const char* data, int length)
 {
     WebKitWebSrc* src = WEBKIT_WEB_SRC(m_src.get());
+    GstBaseSrc* baseSrc = GST_BASE_SRC_CAST(src);
     WebKitWebSrcPrivate* priv = src->priv;
 
-    if (priv->isSeeking) {
-        GST_DEBUG_OBJECT(src, "Seek in progress, ignoring data");
-        return;
+    GST_LOG_OBJECT(src, "Have %d bytes of data", length);
+    LockHolder locker(priv->responseLock);
+
+    uint64_t newPosition = priv->readPosition + length;
+    if (LIKELY (priv->requestedPosition == priv->readPosition))
+        priv->requestedPosition = newPosition;
+    priv->readPosition = newPosition;
+
+    uint64_t newSize = 0;
+    if (priv->haveSize && (newPosition > priv->size)) {
+        GST_DEBUG_OBJECT(src, "Got position previous estimated content size (%" G_GINT64_FORMAT " > %" G_GINT64_FORMAT ")", newPosition, priv->size);
+        newSize = newPosition;
+    } else if (!priv->haveSize) {
+        GST_DEBUG_OBJECT(src, "Got initial response without Content-Length, assuming response size as duration.");
+        newSize = length;
+        priv->haveSize = true;
     }
 
-    if (priv->offset < priv->requestedOffset) {
-        // Range request failed; seeking manually.
-        if (priv->offset + length <= priv->requestedOffset) {
-            // Discard all the buffers coming before the requested seek position.
-            priv->offset += length;
-            return;
-        }
-
-        if (priv->offset + length > priv->requestedOffset) {
-            guint64 offset = priv->requestedOffset - priv->offset;
-            data += offset;
-            length -= offset;
-            priv->offset = priv->requestedOffset;
-        }
-
-        priv->requestedOffset = 0;
+    if (newSize) {
+        priv->size = newSize;
+        baseSrc->segment.duration = priv->size;
+        gst_element_post_message(GST_ELEMENT_CAST(src), gst_message_new_duration_changed(GST_OBJECT_CAST(src)));
     }
 
     checkUpdateBlocksize(length);
 
-    uint64_t startingOffset = priv->offset;
-
-    if (priv->requestedOffset == priv->offset)
-        priv->requestedOffset += length;
-    priv->offset += length;
-    // priv->size == 0 if received length on didReceiveResponse < 0.
-    if (priv->size > 0 && priv->offset > priv->size) {
-        GST_DEBUG_OBJECT(src, "Updating internal size from %" G_GUINT64_FORMAT " to %" G_GUINT64_FORMAT, priv->size, priv->offset);
-        gst_app_src_set_size(priv->appsrc, priv->offset);
-        priv->size = priv->offset;
-    }
-
-    // Now split the recv'd buffer into buffers that are of a size basesrc suggests. It is important not
-    // to push buffers that are too large, otherwise incorrect buffering messages can be sent from the
-    // pipeline.
-    uint64_t bufferSize = static_cast<uint64_t>(length);
-    uint64_t blockSize = static_cast<uint64_t>(gst_base_src_get_blocksize(GST_BASE_SRC_CAST(priv->appsrc)));
-    GST_LOG_OBJECT(src, "Splitting the received buffer into %" PRIu64 " blocks", bufferSize / blockSize);
-    for (uint64_t currentOffset = 0; currentOffset < bufferSize; currentOffset += blockSize) {
-        uint64_t subBufferOffset = startingOffset + currentOffset;
-        uint64_t currentOffsetSize = std::min(blockSize, bufferSize - currentOffset);
-
-        GstBuffer* subBuffer = gst_buffer_new_wrapped(g_memdup(data + currentOffset, currentOffsetSize), currentOffsetSize);
-        if (UNLIKELY(!subBuffer)) {
-            GST_ELEMENT_ERROR(src, CORE, FAILED, ("Failed to allocate sub-buffer"), (nullptr));
-            break;
-        }
-
-        GST_TRACE_OBJECT(src, "Sub-buffer bounds: %" PRIu64 " -- %" PRIu64, subBufferOffset, subBufferOffset + currentOffsetSize);
-        GST_BUFFER_OFFSET(subBuffer) = subBufferOffset;
-        GST_BUFFER_OFFSET_END(subBuffer) = subBufferOffset + currentOffsetSize;
+    if (!priv->wasResponseReceived)
+        priv->wasResponseReceived = true;
+    priv->responseCondition.notifyOne();
 
-        if (priv->isSeeking) {
-            GST_TRACE_OBJECT(src, "Stopping buffer appends due to seek");
-            // A seek has happened in the middle of us breaking the
-            // incoming data up from a previous request. Stop pushing
-            // buffers that are now from the incorrect offset.
-            break;
-        }
-
-        // It may be tempting to use a GstBufferList here, but note
-        // that there is a race condition in GstDownloadBuffer during
-        // seek flushes that can cause decoders to read at incorrect
-        // offsets.
-        GstFlowReturn ret = gst_app_src_push_buffer(priv->appsrc, subBuffer);
-
-        if (UNLIKELY(ret != GST_FLOW_OK && ret != GST_FLOW_EOS && ret != GST_FLOW_FLUSHING)) {
-            GST_ELEMENT_ERROR(src, CORE, FAILED, (nullptr), (nullptr));
-            break;
-        }
+    {
+        LockHolder adapterLocker(priv->adapterLock);
+        GstBuffer* buffer = gst_buffer_new_wrapped(g_memdup(data, length), length);
+        priv->queueSize += length;
+        gst_adapter_push(priv->adapter.get(), buffer);
+        priv->adapterCondition.notifyOne();
     }
 }
 
@@ -973,8 +1014,8 @@ void CachedResourceStreamingClient::accessControlCheckFailed(PlatformMediaResour
 {
     WebKitWebSrc* src = WEBKIT_WEB_SRC(m_src.get());
     GST_ELEMENT_ERROR(src, RESOURCE, READ, ("%s", error.localizedDescription().utf8().data()), (nullptr));
-    gst_app_src_end_of_stream(src->priv->appsrc);
-    webKitWebSrcStop(src);
+    src->priv->doesHaveEOS = true;
+    webKitWebSrcStop(GST_BASE_SRC_CAST(src));
 }
 
 void CachedResourceStreamingClient::loadFailed(PlatformMediaResource&, const ResourceError& error)
@@ -986,7 +1027,7 @@ void CachedResourceStreamingClient::loadFailed(PlatformMediaResource&, const Res
         GST_ELEMENT_ERROR(src, RESOURCE, FAILED, ("%s", error.localizedDescription().utf8().data()), (nullptr));
     }
 
-    gst_app_src_end_of_stream(src->priv->appsrc);
+    src->priv->doesHaveEOS = true;
 }
 
 void CachedResourceStreamingClient::loadFinished(PlatformMediaResource&)
@@ -994,10 +1035,8 @@ void CachedResourceStreamingClient::loadFinished(PlatformMediaResource&)
     WebKitWebSrc* src = WEBKIT_WEB_SRC(m_src.get());
     WebKitWebSrcPrivate* priv = src->priv;
 
-    GST_DEBUG_OBJECT(src, "Have EOS");
-
-    if (!priv->isSeeking)
-        gst_app_src_end_of_stream(priv->appsrc);
+    if (priv->isSeeking && !priv->isFlushing)
+        priv->isSeeking = false;
 }
 
 bool webKitSrcWouldTaintOrigin(WebKitWebSrc* src, const SecurityOrigin& origin)
index 35e51b3..c86fb56 100644 (file)
@@ -20,6 +20,7 @@
 
 #if ENABLE(VIDEO) && USE(GSTREAMER)
 
+#include <gst/base/gstpushsrc.h>
 #include <gst/gst.h>
 
 namespace WebCore {
@@ -40,13 +41,13 @@ typedef struct _WebKitWebSrcClass   WebKitWebSrcClass;
 typedef struct _WebKitWebSrcPrivate WebKitWebSrcPrivate;
 
 struct _WebKitWebSrc {
-    GstBin parent;
+    GstPushSrc parent;
 
     WebKitWebSrcPrivate *priv;
 };
 
 struct _WebKitWebSrcClass {
-    GstBinClass parentClass;
+    GstPushSrcClass parentClass;
 };
 
 GType webkit_web_src_get_type(void);