[MSE][GStreamer] Use sentinel buffer to detect end of append
authoraboya@igalia.com <aboya@igalia.com@268f45cc-cd09-0410-ab3c-d52691b4dbfc>
Thu, 27 Sep 2018 16:03:48 +0000 (16:03 +0000)
committeraboya@igalia.com <aboya@igalia.com@268f45cc-cd09-0410-ab3c-d52691b4dbfc>
Thu, 27 Sep 2018 16:03:48 +0000 (16:03 +0000)
https://bugs.webkit.org/show_bug.cgi?id=189924

Reviewed by Philippe Normand.

This patch introduces a new mechanism to detect when an append has
been consumed completely by the demuxer. It takes advantage of the
fact that buffer pushing is synchronous: both the appsrc and the
demuxer live in the same streaming thread. When appsrc pushes a
buffer, it's actually making a qtdemux function call (it calls its
"chain" function). The demuxer will return from that call when it has
finished processing that buffer; only then the control returns to
appsrc, that can push the next buffer.

By pushing an additional buffer and capturing it in a probe we can
detect reliably when the previous buffer has been processed.
Because the pipeline only has one thread, at this point no more frames
can arrive to the appsink.

This replaces the old method of detecting end of append which relied
on the `need-data` event, which is more difficult to handle correctly
because it fires whenever the appsrc is empty (or below a given
level), which also happens when a buffer has not been pushed yet or
in response to a flush.

* platform/graphics/gstreamer/mse/AppendPipeline.cpp:
(WebCore::EndOfAppendMeta::init):
(WebCore::EndOfAppendMeta::transform):
(WebCore::EndOfAppendMeta::free):
(WebCore::AppendPipeline::staticInitialization):
(WebCore::AppendPipeline::AppendPipeline):
(WebCore::AppendPipeline::~AppendPipeline):
(WebCore::AppendPipeline::appsrcEndOfAppendCheckerProbe):
(WebCore::AppendPipeline::handleApplicationMessage):
(WebCore::AppendPipeline::handleEndOfAppend):
(WebCore::AppendPipeline::consumeAppsinkAvailableSamples):
(WebCore::AppendPipeline::resetPipeline):
(WebCore::AppendPipeline::pushNewBuffer):
(WebCore::AppendPipeline::handleAppsrcNeedDataReceived): Deleted.:
(WebCore::AppendPipeline::handleAppsrcAtLeastABufferLeft): Deleted.
(WebCore::AppendPipeline::checkEndOfAppend): Deleted.
(WebCore::AppendPipeline::setAppsrcDataLeavingProbe): Deleted.
(WebCore::AppendPipeline::removeAppsrcDataLeavingProbe): Deleted.
(WebCore::AppendPipeline::reportAppsrcAtLeastABufferLeft): Deleted.
(WebCore::AppendPipeline::reportAppsrcNeedDataReceived): Deleted.
(WebCore::appendPipelineAppsrcDataLeaving): Deleted.
(WebCore::appendPipelineAppsrcNeedData): Deleted.
* platform/graphics/gstreamer/mse/AppendPipeline.h:

git-svn-id: https://svn.webkit.org/repository/webkit/trunk@236547 268f45cc-cd09-0410-ab3c-d52691b4dbfc

Source/WebCore/ChangeLog
Source/WebCore/platform/graphics/gstreamer/mse/AppendPipeline.cpp
Source/WebCore/platform/graphics/gstreamer/mse/AppendPipeline.h

index b831ae4..b577883 100644 (file)
@@ -1,3 +1,54 @@
+2018-09-27  Alicia Boya GarcĂ­a  <aboya@igalia.com>
+
+        [MSE][GStreamer] Use sentinel buffer to detect end of append
+        https://bugs.webkit.org/show_bug.cgi?id=189924
+
+        Reviewed by Philippe Normand.
+
+        This patch introduces a new mechanism to detect when an append has
+        been consumed completely by the demuxer. It takes advantage of the
+        fact that buffer pushing is synchronous: both the appsrc and the
+        demuxer live in the same streaming thread. When appsrc pushes a
+        buffer, it's actually making a qtdemux function call (it calls its
+        "chain" function). The demuxer will return from that call when it has
+        finished processing that buffer; only then the control returns to
+        appsrc, that can push the next buffer.
+
+        By pushing an additional buffer and capturing it in a probe we can
+        detect reliably when the previous buffer has been processed.
+        Because the pipeline only has one thread, at this point no more frames
+        can arrive to the appsink.
+
+        This replaces the old method of detecting end of append which relied
+        on the `need-data` event, which is more difficult to handle correctly
+        because it fires whenever the appsrc is empty (or below a given
+        level), which also happens when a buffer has not been pushed yet or
+        in response to a flush.
+
+        * platform/graphics/gstreamer/mse/AppendPipeline.cpp:
+        (WebCore::EndOfAppendMeta::init):
+        (WebCore::EndOfAppendMeta::transform):
+        (WebCore::EndOfAppendMeta::free):
+        (WebCore::AppendPipeline::staticInitialization):
+        (WebCore::AppendPipeline::AppendPipeline):
+        (WebCore::AppendPipeline::~AppendPipeline):
+        (WebCore::AppendPipeline::appsrcEndOfAppendCheckerProbe):
+        (WebCore::AppendPipeline::handleApplicationMessage):
+        (WebCore::AppendPipeline::handleEndOfAppend):
+        (WebCore::AppendPipeline::consumeAppsinkAvailableSamples):
+        (WebCore::AppendPipeline::resetPipeline):
+        (WebCore::AppendPipeline::pushNewBuffer):
+        (WebCore::AppendPipeline::handleAppsrcNeedDataReceived): Deleted.:
+        (WebCore::AppendPipeline::handleAppsrcAtLeastABufferLeft): Deleted.
+        (WebCore::AppendPipeline::checkEndOfAppend): Deleted.
+        (WebCore::AppendPipeline::setAppsrcDataLeavingProbe): Deleted.
+        (WebCore::AppendPipeline::removeAppsrcDataLeavingProbe): Deleted.
+        (WebCore::AppendPipeline::reportAppsrcAtLeastABufferLeft): Deleted.
+        (WebCore::AppendPipeline::reportAppsrcNeedDataReceived): Deleted.
+        (WebCore::appendPipelineAppsrcDataLeaving): Deleted.
+        (WebCore::appendPipelineAppsrcNeedData): Deleted.
+        * platform/graphics/gstreamer/mse/AppendPipeline.h:
+
 2018-09-27  Chris Dumez  <cdumez@apple.com>
 
         The WebContent process should not process incoming IPC while waiting for a sync IPC reply
index 887e3d5..efef7c3 100644 (file)
@@ -46,6 +46,26 @@ GST_DEBUG_CATEGORY_EXTERN(webkit_mse_debug);
 
 namespace WebCore {
 
+GType AppendPipeline::s_endOfAppendMetaType = 0;
+const GstMetaInfo* AppendPipeline::s_webKitEndOfAppendMetaInfo = nullptr;
+std::once_flag AppendPipeline::s_staticInitializationFlag;
+
+struct EndOfAppendMeta {
+    GstMeta base;
+    static gboolean init(GstMeta*, void*, GstBuffer*) { return TRUE; }
+    static gboolean transform(GstBuffer*, GstMeta*, GstBuffer*, GQuark, void*) { g_return_val_if_reached(FALSE); }
+    static void free(GstMeta*, GstBuffer*) { }
+};
+
+void AppendPipeline::staticInitialization()
+{
+    ASSERT(WTF::isMainThread());
+
+    const char* tags[] = { nullptr };
+    s_endOfAppendMetaType = gst_meta_api_type_register("WebKitEndOfAppendMetaAPI", tags);
+    s_webKitEndOfAppendMetaInfo = gst_meta_register(s_endOfAppendMetaType, "WebKitEndOfAppendMeta", sizeof(EndOfAppendMeta), EndOfAppendMeta::init, EndOfAppendMeta::free, EndOfAppendMeta::transform);
+}
+
 static const char* dumpAppendState(AppendPipeline::AppendState appendState)
 {
     switch (appendState) {
@@ -68,11 +88,9 @@ static const char* dumpAppendState(AppendPipeline::AppendState appendState)
     }
 }
 
-static void appendPipelineAppsrcNeedData(GstAppSrc*, guint, AppendPipeline*);
 static void appendPipelineDemuxerPadAdded(GstElement*, GstPad*, AppendPipeline*);
 static void appendPipelineDemuxerPadRemoved(GstElement*, GstPad*, AppendPipeline*);
 static void appendPipelineAppsinkCapsChanged(GObject*, GParamSpec*, AppendPipeline*);
-static GstPadProbeReturn appendPipelineAppsrcDataLeaving(GstPad*, GstPadProbeInfo*, AppendPipeline*);
 #if !LOG_DISABLED
 static GstPadProbeReturn appendPipelinePadProbeDebugInformation(GstPad*, GstPadProbeInfo*, struct PadProbeInformation*);
 #endif
@@ -110,14 +128,12 @@ AppendPipeline::AppendPipeline(Ref<MediaSourceClientGStreamerMSE> mediaSourceCli
     , m_playerPrivate(&playerPrivate)
     , m_id(0)
     , m_wasBusAlreadyNotifiedOfAvailableSamples(false)
-    , m_appsrcAtLeastABufferLeft(false)
-    , m_appsrcNeedDataReceived(false)
-    , m_appsrcDataLeavingProbeId(0)
     , m_appendState(AppendState::NotStarted)
     , m_abortPending(false)
     , m_streamType(Unknown)
 {
     ASSERT(WTF::isMainThread());
+    std::call_once(s_staticInitializationFlag, AppendPipeline::staticInitialization);
 
     GST_TRACE("Creating AppendPipeline (%p)", this);
 
@@ -137,6 +153,11 @@ AppendPipeline::AppendPipeline(Ref<MediaSourceClientGStreamerMSE> mediaSourceCli
     // below will already take the initial reference and we need an additional one for us.
     m_appsrc = gst_element_factory_make("appsrc", nullptr);
 
+    GRefPtr<GstPad> appsrcPad = adoptGRef(gst_element_get_static_pad(m_appsrc.get(), "src"));
+    gst_pad_add_probe(appsrcPad.get(), GST_PAD_PROBE_TYPE_BUFFER, [](GstPad*, GstPadProbeInfo* padProbeInfo, void* userData) {
+        return static_cast<AppendPipeline*>(userData)->appsrcEndOfAppendCheckerProbe(padProbeInfo);
+    }, this, nullptr);
+
     const String& type = m_sourceBufferPrivate->type().containerType();
     if (type.endsWith("mp4"))
         m_demux = gst_element_factory_make("qtdemux", nullptr);
@@ -154,8 +175,6 @@ AppendPipeline::AppendPipeline(Ref<MediaSourceClientGStreamerMSE> mediaSourceCli
     GRefPtr<GstPad> appsinkPad = adoptGRef(gst_element_get_static_pad(m_appsink.get(), "sink"));
     g_signal_connect(appsinkPad.get(), "notify::caps", G_CALLBACK(appendPipelineAppsinkCapsChanged), this);
 
-    setAppsrcDataLeavingProbe();
-
 #if !LOG_DISABLED
     GRefPtr<GstPad> demuxerPad = adoptGRef(gst_element_get_static_pad(m_demux.get(), "sink"));
     m_demuxerDataEnteringPadProbeInformation.appendPipeline = this;
@@ -173,7 +192,6 @@ AppendPipeline::AppendPipeline(Ref<MediaSourceClientGStreamerMSE> mediaSourceCli
 #endif
 
     // These signals won't be connected outside of the lifetime of "this".
-    g_signal_connect(m_appsrc.get(), "need-data", G_CALLBACK(appendPipelineAppsrcNeedData), this);
     g_signal_connect(m_demux.get(), "pad-added", G_CALLBACK(appendPipelineDemuxerPadAdded), this);
     g_signal_connect(m_demux.get(), "pad-removed", G_CALLBACK(appendPipelineDemuxerPadRemoved), this);
     g_signal_connect(m_demux.get(), "no-more-pads", G_CALLBACK(appendPipelineDemuxerNoMorePads), this);
@@ -213,7 +231,6 @@ AppendPipeline::~AppendPipeline()
     }
 
     if (m_appsrc) {
-        removeAppsrcDataLeavingProbe();
         g_signal_handlers_disconnect_by_data(m_appsrc.get(), this);
         m_appsrc = nullptr;
     }
@@ -247,6 +264,27 @@ AppendPipeline::~AppendPipeline()
     m_demuxerSrcPadCaps = nullptr;
 };
 
+GstPadProbeReturn AppendPipeline::appsrcEndOfAppendCheckerProbe(GstPadProbeInfo* padProbeInfo)
+{
+    ASSERT(!WTF::isMainThread());
+    m_streamingThread = &WTF::Thread::current();
+
+    GstBuffer* buffer = GST_BUFFER(padProbeInfo->data);
+    ASSERT(GST_IS_BUFFER(buffer));
+
+    EndOfAppendMeta* endOfAppendMeta = reinterpret_cast<EndOfAppendMeta*>(gst_buffer_get_meta(buffer, s_endOfAppendMetaType));
+    if (!endOfAppendMeta) {
+        // Normal buffer, nothing to do.
+        return GST_PAD_PROBE_OK;
+    }
+
+    GST_TRACE_OBJECT(this, "posting end-of-append request to bus");
+    GstStructure* structure = gst_structure_new_empty("end-of-append");
+    GstMessage* message = gst_message_new_application(GST_OBJECT(m_appsrc.get()), structure);
+    gst_bus_post(m_bus.get(), message);
+    return GST_PAD_PROBE_DROP;
+}
+
 void AppendPipeline::clearPlayerPrivate()
 {
     ASSERT(WTF::isMainThread());
@@ -285,16 +323,6 @@ void AppendPipeline::handleApplicationMessage(GstMessage* message)
 
     const GstStructure* structure = gst_message_get_structure(message);
 
-    if (gst_structure_has_name(structure, "appsrc-need-data")) {
-        handleAppsrcNeedDataReceived();
-        return;
-    }
-
-    if (gst_structure_has_name(structure, "appsrc-buffer-left")) {
-        handleAppsrcAtLeastABufferLeft();
-        return;
-    }
-
     if (gst_structure_has_name(structure, "demuxer-connect-to-appsink")) {
         GRefPtr<GstPad> demuxerSrcPad;
         gst_structure_get(structure, "demuxer-src-pad", G_TYPE_OBJECT, &demuxerSrcPad.outPtr(), nullptr);
@@ -324,6 +352,11 @@ void AppendPipeline::handleApplicationMessage(GstMessage* message)
         return;
     }
 
+    if (gst_structure_has_name(structure, "end-of-append")) {
+        handleEndOfAppend();
+        return;
+    }
+
     ASSERT_NOT_REACHED();
 }
 
@@ -353,31 +386,6 @@ void AppendPipeline::handleStateChangeMessage(GstMessage* message)
     }
 }
 
-void AppendPipeline::handleAppsrcNeedDataReceived()
-{
-    if (!m_appsrcAtLeastABufferLeft) {
-        GST_TRACE("discarding until at least a buffer leaves appsrc");
-        return;
-    }
-
-    ASSERT(m_appendState == AppendState::Ongoing || m_appendState == AppendState::Sampling);
-    ASSERT(!m_appsrcNeedDataReceived);
-
-    GST_TRACE("received need-data from appsrc");
-
-    m_appsrcNeedDataReceived = true;
-    checkEndOfAppend();
-}
-
-void AppendPipeline::handleAppsrcAtLeastABufferLeft()
-{
-    m_appsrcAtLeastABufferLeft = true;
-    GST_TRACE("received buffer-left from appsrc");
-#if LOG_DISABLED
-    removeAppsrcDataLeavingProbe();
-#endif
-}
-
 gint AppendPipeline::id()
 {
     ASSERT(WTF::isMainThread());
@@ -438,7 +446,7 @@ void AppendPipeline::setAppendState(AppendState newAppendState)
         case AppendState::NotStarted:
             ok = true;
             if (m_pendingBuffer) {
-                GST_TRACE("pushing pending buffer %p", m_pendingBuffer.get());
+                GST_TRACE("pushing pending buffer %" GST_PTR_FORMAT, m_pendingBuffer.get());
                 gst_app_src_push_buffer(GST_APP_SRC(appsrc()), m_pendingBuffer.leakRef());
                 nextAppendState = AppendState::Ongoing;
             }
@@ -603,24 +611,19 @@ void AppendPipeline::appsinkCapsChanged()
     }
 }
 
-void AppendPipeline::checkEndOfAppend()
+void AppendPipeline::handleEndOfAppend()
 {
     ASSERT(WTF::isMainThread());
+    GST_TRACE_OBJECT(this, "received end-of-append");
 
-    if (!m_appsrcNeedDataReceived || (m_appendState != AppendState::Ongoing && m_appendState != AppendState::Sampling))
-        return;
-
-    GST_TRACE("end of append data mark was received");
-
+    // Regardless of the state transition, the result is the same: didReceiveAllPendingSamples() is called.
     switch (m_appendState) {
     case AppendState::Ongoing:
         GST_TRACE("DataStarve");
-        m_appsrcNeedDataReceived = false;
         setAppendState(AppendState::DataStarve);
         break;
     case AppendState::Sampling:
         GST_TRACE("LastSample");
-        m_appsrcNeedDataReceived = false;
         setAppendState(AppendState::LastSample);
         break;
     default:
@@ -633,14 +636,6 @@ void AppendPipeline::appsinkNewSample(GRefPtr<GstSample>&& sample)
 {
     ASSERT(WTF::isMainThread());
 
-    // Ignore samples if we're not expecting them. Refuse processing if we're in Invalid state.
-    if (m_appendState != AppendState::Ongoing && m_appendState != AppendState::Sampling) {
-        GST_WARNING("Unexpected sample, appendState=%s", dumpAppendState(m_appendState));
-        // FIXME: Return ERROR and find a more robust way to detect that all the
-        // data has been processed, so we don't need to resort to these hacks.
-        return;
-    }
-
     if (UNLIKELY(!gst_sample_get_buffer(sample.get()))) {
         GST_WARNING("Received sample without buffer from appsink.");
         return;
@@ -748,17 +743,12 @@ void AppendPipeline::consumeAppsinkAvailableSamples()
     }
 
     GST_TRACE_OBJECT(this, "batchedSampleCount = %d", batchedSampleCount);
-
-    if (batchedSampleCount > 0)
-        checkEndOfAppend();
 }
 
 void AppendPipeline::resetPipeline()
 {
     ASSERT(WTF::isMainThread());
     GST_DEBUG("resetting pipeline");
-    m_appsrcAtLeastABufferLeft = false;
-    setAppsrcDataLeavingProbe();
 
     gst_element_set_state(m_pipeline.get(), GST_STATE_READY);
     gst_element_get_state(m_pipeline.get(), nullptr, nullptr, 0);
@@ -774,29 +764,6 @@ void AppendPipeline::resetPipeline()
 
 }
 
-void AppendPipeline::setAppsrcDataLeavingProbe()
-{
-    if (m_appsrcDataLeavingProbeId)
-        return;
-
-    GST_TRACE("setting appsrc data leaving probe");
-
-    GRefPtr<GstPad> appsrcPad = adoptGRef(gst_element_get_static_pad(m_appsrc.get(), "src"));
-    m_appsrcDataLeavingProbeId = gst_pad_add_probe(appsrcPad.get(), GST_PAD_PROBE_TYPE_BUFFER, reinterpret_cast<GstPadProbeCallback>(appendPipelineAppsrcDataLeaving), this, nullptr);
-}
-
-void AppendPipeline::removeAppsrcDataLeavingProbe()
-{
-    if (!m_appsrcDataLeavingProbeId)
-        return;
-
-    GST_TRACE("removing appsrc data leaving probe");
-
-    GRefPtr<GstPad> appsrcPad = adoptGRef(gst_element_get_static_pad(m_appsrc.get(), "src"));
-    gst_pad_remove_probe(appsrcPad.get(), m_appsrcDataLeavingProbeId);
-    m_appsrcDataLeavingProbeId = 0;
-}
-
 void AppendPipeline::abort()
 {
     ASSERT(WTF::isMainThread());
@@ -816,39 +783,51 @@ void AppendPipeline::abort()
 
 GstFlowReturn AppendPipeline::pushNewBuffer(GstBuffer* buffer)
 {
-    GstFlowReturn result;
-
     if (m_abortPending) {
         m_pendingBuffer = adoptGRef(buffer);
-        result = GST_FLOW_OK;
-    } else {
-        setAppendState(AppendPipeline::AppendState::Ongoing);
-        GST_TRACE("pushing new buffer %p", buffer);
-        result = gst_app_src_push_buffer(GST_APP_SRC(appsrc()), buffer);
+        return GST_FLOW_OK;
     }
 
-    return result;
-}
+    setAppendState(AppendPipeline::AppendState::Ongoing);
 
-void AppendPipeline::reportAppsrcAtLeastABufferLeft()
-{
-    GST_TRACE("buffer left appsrc, reposting to bus");
-    GstStructure* structure = gst_structure_new_empty("appsrc-buffer-left");
-    GstMessage* message = gst_message_new_application(GST_OBJECT(m_appsrc.get()), structure);
-    gst_bus_post(m_bus.get(), message);
-}
+    GST_TRACE_OBJECT(this, "pushing data buffer %" GST_PTR_FORMAT, buffer);
+    GstFlowReturn pushDataBufferRet = gst_app_src_push_buffer(GST_APP_SRC(m_appsrc.get()), buffer);
+    // Pushing buffers to appsrc can only fail if the appsrc is flushing, in EOS or stopped. Neither of these should
+    // be true at this point.
+    g_return_val_if_fail(pushDataBufferRet == GST_FLOW_OK, GST_FLOW_ERROR);
 
-void AppendPipeline::reportAppsrcNeedDataReceived()
-{
-    GST_TRACE("received need-data signal at appsrc, reposting to bus");
-    GstStructure* structure = gst_structure_new_empty("appsrc-need-data");
-    GstMessage* message = gst_message_new_application(GST_OBJECT(m_appsrc.get()), structure);
-    gst_bus_post(m_bus.get(), message);
+    // Push an additional empty buffer that marks the end of the append.
+    // This buffer is detected and consumed by appsrcEndOfAppendCheckerProbe(), which uses it to signal the successful
+    // completion of the append.
+    //
+    // This works based on how push mode scheduling works in GStreamer. Note there is a single streaming thread for the
+    // AppendPipeline, and within a stream (the portion of a pipeline covered by the same streaming thread, in this case
+    // the whole pipeline) a buffer is guaranteed not to be processed by downstream until processing of the previous
+    // buffer has completed.
+
+    GstBuffer* endOfAppendBuffer = gst_buffer_new();
+    gst_buffer_add_meta(endOfAppendBuffer, s_webKitEndOfAppendMetaInfo, nullptr);
+
+    GST_TRACE_OBJECT(this, "pushing end-of-append buffer %" GST_PTR_FORMAT, endOfAppendBuffer);
+    GstFlowReturn pushEndOfAppendBufferRet = gst_app_src_push_buffer(GST_APP_SRC(m_appsrc.get()), endOfAppendBuffer);
+    g_return_val_if_fail(pushEndOfAppendBufferRet == GST_FLOW_OK, GST_FLOW_ERROR);
+
+    return GST_FLOW_OK;
 }
 
 GstFlowReturn AppendPipeline::handleNewAppsinkSample(GstElement* appsink)
 {
     ASSERT(!WTF::isMainThread());
+    if (&WTF::Thread::current() != m_streamingThread) {
+        // m_streamingThreadId has been initialized in appsrcEndOfAppendCheckerProbe().
+        // For a buffer to reach the appsink, a buffer must have passed through appsrcEndOfAppendCheckerProbe() first.
+        // This error will only raise if someone modifies the pipeline to include more than one streaming thread or
+        // removes the appsrcEndOfAppendCheckerProbe(). Either way, the end-of-append detection would be broken.
+        // AppendPipeline should have only one streaming thread. Otherwise we can't detect reliably when an appends has
+        // been demuxed completely.;
+        g_critical("Appsink received a sample in a different thread than appsrcEndOfAppendCheckerProbe run.");
+        RELEASE_ASSERT_NOT_REACHED();
+    }
 
     if (!m_playerPrivate || m_appendState == AppendState::Invalid) {
         GST_WARNING("AppendPipeline has been disabled, ignoring this sample");
@@ -1033,9 +1012,7 @@ void AppendPipeline::connectDemuxerSrcPadToAppsink(GstPad* demuxerSrcPad)
         didReceiveInitializationSegment();
         return;
     default:
-        // No useful data, but notify anyway to complete the append operation.
-        GST_DEBUG("Received all pending samples (no data)");
-        m_sourceBufferPrivate->didReceiveAllPendingSamples();
+        // No useful data.
         break;
     }
 
@@ -1078,20 +1055,6 @@ static void appendPipelineAppsinkCapsChanged(GObject* appsinkPad, GParamSpec*, A
     GST_TRACE("appsink-caps-changed message posted to bus");
 }
 
-static GstPadProbeReturn appendPipelineAppsrcDataLeaving(GstPad*, GstPadProbeInfo* info, AppendPipeline* appendPipeline)
-{
-    ASSERT(GST_PAD_PROBE_INFO_TYPE(info) & GST_PAD_PROBE_TYPE_BUFFER);
-
-    GstBuffer* buffer = GST_PAD_PROBE_INFO_BUFFER(info);
-    gsize bufferSize = gst_buffer_get_size(buffer);
-
-    GST_TRACE("buffer of size %" G_GSIZE_FORMAT " going thru", bufferSize);
-
-    appendPipeline->reportAppsrcAtLeastABufferLeft();
-
-    return GST_PAD_PROBE_OK;
-}
-
 #if !LOG_DISABLED
 static GstPadProbeReturn appendPipelinePadProbeDebugInformation(GstPad*, GstPadProbeInfo* info, struct PadProbeInformation* padProbeInformation)
 {
@@ -1131,11 +1094,6 @@ static GstPadProbeReturn appendPipelineDemuxerBlackHolePadProbe(GstPad*, GstPadP
     return GST_PAD_PROBE_DROP;
 }
 
-static void appendPipelineAppsrcNeedData(GstAppSrc*, guint, AppendPipeline* appendPipeline)
-{
-    appendPipeline->reportAppsrcNeedDataReceived();
-}
-
 static void appendPipelineDemuxerPadAdded(GstElement*, GstPad* demuxerSrcPad, AppendPipeline* appendPipeline)
 {
     appendPipeline->connectDemuxerSrcPadToAppsinkFromAnyThread(demuxerSrcPad);
index 3660cb6..da4f75e 100644 (file)
@@ -29,7 +29,9 @@
 
 #include <atomic>
 #include <gst/gst.h>
+#include <mutex>
 #include <wtf/Condition.h>
+#include <wtf/Threading.h>
 
 namespace WebCore {
 
@@ -64,6 +66,7 @@ public:
     void appsinkCapsChanged();
     void appsinkNewSample(GRefPtr<GstSample>&&);
     void appsinkEOS();
+    void handleEndOfAppend();
     void didReceiveInitializationSegment();
     AtomicString trackId();
     void abort();
@@ -85,20 +88,25 @@ public:
     void connectDemuxerSrcPadToAppsinkFromAnyThread(GstPad*);
     void connectDemuxerSrcPadToAppsink(GstPad*);
 
-    void reportAppsrcAtLeastABufferLeft();
-    void reportAppsrcNeedDataReceived();
-
 private:
     void resetPipeline();
     void checkEndOfAppend();
-    void handleAppsrcAtLeastABufferLeft();
-    void handleAppsrcNeedDataReceived();
-    void removeAppsrcDataLeavingProbe();
-    void setAppsrcDataLeavingProbe();
     void demuxerNoMorePads();
 
     void consumeAppsinkAvailableSamples();
 
+    GstPadProbeReturn appsrcEndOfAppendCheckerProbe(GstPadProbeInfo*);
+
+    static void staticInitialization();
+
+    static std::once_flag s_staticInitializationFlag;
+    static GType s_endOfAppendMetaType;
+    static const GstMetaInfo* s_webKitEndOfAppendMetaInfo;
+
+    // Used only for asserting that there is only one streaming thread.
+    // Only the pointers are compared.
+    WTF::Thread* m_streamingThread;
+
     Ref<MediaSourceClientGStreamerMSE> m_mediaSourceClient;
     Ref<SourceBufferPrivateGStreamer> m_sourceBufferPrivate;
     MediaPlayerPrivateGStreamerMSE* m_playerPrivate;
@@ -130,10 +138,6 @@ private:
     GRefPtr<GstCaps> m_demuxerSrcPadCaps;
     FloatSize m_presentationSize;
 
-    bool m_appsrcAtLeastABufferLeft;
-    bool m_appsrcNeedDataReceived;
-
-    gulong m_appsrcDataLeavingProbeId;
 #if !LOG_DISABLED
     struct PadProbeInformation m_demuxerDataEnteringPadProbeInformation;
     struct PadProbeInformation m_appsinkDataEnteringPadProbeInformation;