[MSE][GStreamer] Introduce AbortableTaskQueue
[WebKit-https.git] / Source / WebCore / platform / graphics / gstreamer / mse / AppendPipeline.cpp
index 839da62..1b7b4a2 100644 (file)
@@ -32,6 +32,7 @@
 #include "MediaDescription.h"
 #include "SourceBufferPrivateGStreamer.h"
 #include "VideoTrackPrivateGStreamer.h"
+#include <functional>
 #include <gst/app/gstappsink.h>
 #include <gst/app/gstappsrc.h>
 #include <gst/gst.h>
@@ -59,14 +60,14 @@ struct EndOfAppendMeta {
 
 void AppendPipeline::staticInitialization()
 {
-    ASSERT(WTF::isMainThread());
+    ASSERT(isMainThread());
 
     const char* tags[] = { nullptr };
     s_endOfAppendMetaType = gst_meta_api_type_register("WebKitEndOfAppendMetaAPI", tags);
     s_webKitEndOfAppendMetaInfo = gst_meta_register(s_endOfAppendMetaType, "WebKitEndOfAppendMeta", sizeof(EndOfAppendMeta), EndOfAppendMeta::init, EndOfAppendMeta::free, EndOfAppendMeta::transform);
 }
 
-static const char* dumpAppendState(AppendPipeline::AppendState appendState)
+const char* AppendPipeline::dumpAppendState(AppendPipeline::AppendState appendState)
 {
     switch (appendState) {
     case AppendPipeline::AppendState::Invalid:
@@ -88,9 +89,6 @@ static const char* dumpAppendState(AppendPipeline::AppendState appendState)
     }
 }
 
-static void appendPipelineDemuxerPadAdded(GstElement*, GstPad*, AppendPipeline*);
-static void appendPipelineDemuxerPadRemoved(GstElement*, GstPad*, AppendPipeline*);
-static void appendPipelineAppsinkCapsChanged(GObject*, GParamSpec*, AppendPipeline*);
 #if !LOG_DISABLED
 static GstPadProbeReturn appendPipelinePadProbeDebugInformation(GstPad*, GstPadProbeInfo*, struct PadProbeInformation*);
 #endif
@@ -100,28 +98,9 @@ static GstPadProbeReturn appendPipelineAppsinkPadEventProbe(GstPad*, GstPadProbe
 #endif
 
 static GstPadProbeReturn appendPipelineDemuxerBlackHolePadProbe(GstPad*, GstPadProbeInfo*, gpointer);
-static GstFlowReturn appendPipelineAppsinkNewSample(GstElement*, AppendPipeline*);
-static void appendPipelineAppsinkEOS(GstElement*, AppendPipeline*);
-static void appendPipelineDemuxerNoMorePads(GstElement*, AppendPipeline*);
 
 static GstPadProbeReturn matroskademuxForceSegmentStartToEqualZero(GstPad*, GstPadProbeInfo*, void*);
 
-static void appendPipelineNeedContextMessageCallback(GstBus*, GstMessage* message, AppendPipeline* appendPipeline)
-{
-    GST_TRACE("received callback");
-    appendPipeline->handleNeedContextSyncMessage(message);
-}
-
-static void appendPipelineApplicationMessageCallback(GstBus*, GstMessage* message, AppendPipeline* appendPipeline)
-{
-    appendPipeline->handleApplicationMessage(message);
-}
-
-static void appendPipelineStateChangeMessageCallback(GstBus*, GstMessage* message, AppendPipeline* appendPipeline)
-{
-    appendPipeline->handleStateChangeMessage(message);
-}
-
 AppendPipeline::AppendPipeline(Ref<MediaSourceClientGStreamerMSE> mediaSourceClient, Ref<SourceBufferPrivateGStreamer> sourceBufferPrivate, MediaPlayerPrivateGStreamerMSE& playerPrivate)
     : m_mediaSourceClient(mediaSourceClient.get())
     , m_sourceBufferPrivate(sourceBufferPrivate.get())
@@ -132,7 +111,7 @@ AppendPipeline::AppendPipeline(Ref<MediaSourceClientGStreamerMSE> mediaSourceCli
     , m_abortPending(false)
     , m_streamType(Unknown)
 {
-    ASSERT(WTF::isMainThread());
+    ASSERT(isMainThread());
     std::call_once(s_staticInitializationFlag, AppendPipeline::staticInitialization);
 
     GST_TRACE("Creating AppendPipeline (%p)", this);
@@ -148,9 +127,12 @@ AppendPipeline::AppendPipeline(Ref<MediaSourceClientGStreamerMSE> mediaSourceCli
     gst_bus_add_signal_watch_full(m_bus.get(), RunLoopSourcePriority::RunLoopDispatcher);
     gst_bus_enable_sync_message_emission(m_bus.get());
 
-    g_signal_connect(m_bus.get(), "sync-message::need-context", G_CALLBACK(appendPipelineNeedContextMessageCallback), this);
-    g_signal_connect(m_bus.get(), "message::application", G_CALLBACK(appendPipelineApplicationMessageCallback), this);
-    g_signal_connect(m_bus.get(), "message::state-changed", G_CALLBACK(appendPipelineStateChangeMessageCallback), this);
+    g_signal_connect(m_bus.get(), "sync-message::need-context", G_CALLBACK(+[](GstBus*, GstMessage* message, AppendPipeline* appendPipeline) {
+        appendPipeline->handleNeedContextSyncMessage(message);
+    }), this);
+    g_signal_connect(m_bus.get(), "message::state-changed", G_CALLBACK(+[](GstBus*, GstMessage* message, AppendPipeline* appendPipeline) {
+        appendPipeline->handleStateChangeMessage(message);
+    }), this);
 
     // We assign the created instances here instead of adoptRef() because gst_bin_add_many()
     // below will already take the initial reference and we need an additional one for us.
@@ -176,7 +158,11 @@ AppendPipeline::AppendPipeline(Ref<MediaSourceClientGStreamerMSE> mediaSourceCli
     gst_base_sink_set_last_sample_enabled(GST_BASE_SINK(m_appsink.get()), FALSE);
 
     GRefPtr<GstPad> appsinkPad = adoptGRef(gst_element_get_static_pad(m_appsink.get(), "sink"));
-    g_signal_connect(appsinkPad.get(), "notify::caps", G_CALLBACK(appendPipelineAppsinkCapsChanged), this);
+    g_signal_connect(appsinkPad.get(), "notify::caps", G_CALLBACK(+[](GObject*, GParamSpec*, AppendPipeline* appendPipeline) {
+        appendPipeline->m_taskQueue.enqueueTask([appendPipeline]() {
+            appendPipeline->appsinkCapsChanged();
+        });
+    }), this);
 
 #if !LOG_DISABLED
     GRefPtr<GstPad> demuxerPad = adoptGRef(gst_element_get_static_pad(m_demux.get(), "sink"));
@@ -195,11 +181,29 @@ AppendPipeline::AppendPipeline(Ref<MediaSourceClientGStreamerMSE> mediaSourceCli
 #endif
 
     // These signals won't be connected outside of the lifetime of "this".
-    g_signal_connect(m_demux.get(), "pad-added", G_CALLBACK(appendPipelineDemuxerPadAdded), this);
-    g_signal_connect(m_demux.get(), "pad-removed", G_CALLBACK(appendPipelineDemuxerPadRemoved), this);
-    g_signal_connect(m_demux.get(), "no-more-pads", G_CALLBACK(appendPipelineDemuxerNoMorePads), this);
-    g_signal_connect(m_appsink.get(), "new-sample", G_CALLBACK(appendPipelineAppsinkNewSample), this);
-    g_signal_connect(m_appsink.get(), "eos", G_CALLBACK(appendPipelineAppsinkEOS), this);
+    g_signal_connect(m_demux.get(), "pad-added", G_CALLBACK(+[](GstElement*, GstPad* demuxerSrcPad, AppendPipeline* appendPipeline) {
+        appendPipeline->connectDemuxerSrcPadToAppsinkFromStreamingThread(demuxerSrcPad);
+    }), this);
+    g_signal_connect(m_demux.get(), "pad-removed", G_CALLBACK(+[](GstElement*, GstPad* demuxerSrcPad, AppendPipeline* appendPipeline) {
+        appendPipeline->disconnectDemuxerSrcPadFromAppsinkFromAnyThread(demuxerSrcPad);
+    }), this);
+    g_signal_connect(m_demux.get(), "no-more-pads", G_CALLBACK(+[](GstElement*, AppendPipeline* appendPipeline) {
+        ASSERT(!isMainThread());
+        GST_DEBUG("Posting no-more-pads task to main thread");
+        appendPipeline->m_taskQueue.enqueueTask([appendPipeline]() {
+            appendPipeline->demuxerNoMorePads();
+        });
+    }), this);
+    g_signal_connect(m_appsink.get(), "new-sample", G_CALLBACK(+[](GstElement* appsink, AppendPipeline* appendPipeline) {
+        appendPipeline->handleAppsinkNewSampleFromStreamingThread(appsink);
+    }), this);
+    g_signal_connect(m_appsink.get(), "eos", G_CALLBACK(+[](GstElement*, AppendPipeline* appendPipeline) {
+        ASSERT(!isMainThread());
+        GST_DEBUG("Posting appsink-eos task to main thread");
+        appendPipeline->m_taskQueue.enqueueTask([appendPipeline]() {
+            appendPipeline->appsinkEOS();
+        });
+    }), this);
 
     // Add_many will take ownership of a reference. That's why we used an assignment before.
     gst_bin_add_many(GST_BIN(m_pipeline.get()), m_appsrc.get(), m_demux.get(), nullptr);
@@ -210,15 +214,11 @@ AppendPipeline::AppendPipeline(Ref<MediaSourceClientGStreamerMSE> mediaSourceCli
 
 AppendPipeline::~AppendPipeline()
 {
-    ASSERT(WTF::isMainThread());
+    ASSERT(isMainThread());
 
     setAppendState(AppendState::Invalid);
-
-    {
-        LockHolder locker(m_padAddRemoveLock);
-        m_playerPrivate = nullptr;
-        m_padAddRemoveCondition.notifyOne();
-    }
+    // Forget all pending tasks and unblock the streaming thread if it was blocked.
+    m_taskQueue.startAborting();
 
     GST_TRACE("Destroying AppendPipeline (%p)", this);
 
@@ -226,7 +226,7 @@ AppendPipeline::~AppendPipeline()
 
     if (m_pipeline) {
         ASSERT(m_bus);
-        g_signal_handlers_disconnect_by_func(m_bus.get(), reinterpret_cast<gpointer>(appendPipelineNeedContextMessageCallback), this);
+        g_signal_handlers_disconnect_by_data(m_bus.get(), this);
         gst_bus_disable_sync_message_emission(m_bus.get());
         gst_bus_remove_signal_watch(m_bus.get());
         gst_element_set_state(m_pipeline.get(), GST_STATE_NULL);
@@ -269,7 +269,7 @@ AppendPipeline::~AppendPipeline()
 
 GstPadProbeReturn AppendPipeline::appsrcEndOfAppendCheckerProbe(GstPadProbeInfo* padProbeInfo)
 {
-    ASSERT(!WTF::isMainThread());
+    ASSERT(!isMainThread());
     m_streamingThread = &WTF::Thread::current();
 
     GstBuffer* buffer = GST_BUFFER(padProbeInfo->data);
@@ -281,27 +281,22 @@ GstPadProbeReturn AppendPipeline::appsrcEndOfAppendCheckerProbe(GstPadProbeInfo*
         return GST_PAD_PROBE_OK;
     }
 
-    GST_TRACE_OBJECT(m_pipeline.get(), "posting end-of-append request to bus");
-    GstStructure* structure = gst_structure_new_empty("end-of-append");
-    GstMessage* message = gst_message_new_application(GST_OBJECT(m_appsrc.get()), structure);
-    gst_bus_post(m_bus.get(), message);
+    GST_TRACE_OBJECT(m_pipeline.get(), "Posting end-of-append task to the main thread");
+    m_taskQueue.enqueueTask([this]() {
+        handleEndOfAppend();
+    });
     return GST_PAD_PROBE_DROP;
 }
 
 void AppendPipeline::clearPlayerPrivate()
 {
-    ASSERT(WTF::isMainThread());
+    ASSERT(isMainThread());
     GST_DEBUG("cleaning private player");
 
     // Make sure that AppendPipeline won't process more data from now on and
     // instruct handleNewSample to abort itself from now on as well.
     setAppendState(AppendState::Invalid);
-
-    {
-        LockHolder locker(m_padAddRemoveLock);
-        m_playerPrivate = nullptr;
-        m_padAddRemoveCondition.notifyOne();
-    }
+    m_taskQueue.startAborting();
 
     // And now that no handleNewSample operations will remain stalled waiting
     // for the main thread, stop the pipeline.
@@ -320,49 +315,6 @@ void AppendPipeline::handleNeedContextSyncMessage(GstMessage* message)
         m_playerPrivate->handleSyncMessage(message);
 }
 
-void AppendPipeline::handleApplicationMessage(GstMessage* message)
-{
-    ASSERT(WTF::isMainThread());
-
-    const GstStructure* structure = gst_message_get_structure(message);
-
-    if (gst_structure_has_name(structure, "demuxer-connect-to-appsink")) {
-        GRefPtr<GstPad> demuxerSrcPad;
-        gst_structure_get(structure, "demuxer-src-pad", G_TYPE_OBJECT, &demuxerSrcPad.outPtr(), nullptr);
-        ASSERT(demuxerSrcPad);
-        connectDemuxerSrcPadToAppsink(demuxerSrcPad.get());
-        return;
-    }
-
-    if (gst_structure_has_name(structure, "appsink-caps-changed")) {
-        appsinkCapsChanged();
-        return;
-    }
-
-    if (gst_structure_has_name(structure, "appsink-new-sample")) {
-        m_wasBusAlreadyNotifiedOfAvailableSamples.clear();
-        consumeAppsinkAvailableSamples();
-        return;
-    }
-
-    if (gst_structure_has_name(structure, "appsink-eos")) {
-        appsinkEOS();
-        return;
-    }
-
-    if (gst_structure_has_name(structure, "demuxer-no-more-pads")) {
-        demuxerNoMorePads();
-        return;
-    }
-
-    if (gst_structure_has_name(structure, "end-of-append")) {
-        handleEndOfAppend();
-        return;
-    }
-
-    ASSERT_NOT_REACHED();
-}
-
 void AppendPipeline::demuxerNoMorePads()
 {
     GST_TRACE("calling didReceiveInitializationSegment");
@@ -373,7 +325,7 @@ void AppendPipeline::demuxerNoMorePads()
 
 void AppendPipeline::handleStateChangeMessage(GstMessage* message)
 {
-    ASSERT(WTF::isMainThread());
+    ASSERT(isMainThread());
 
     if (GST_MESSAGE_SRC(message) == reinterpret_cast<GstObject*>(m_pipeline.get())) {
         GstState currentState, newState;
@@ -391,7 +343,7 @@ void AppendPipeline::handleStateChangeMessage(GstMessage* message)
 
 gint AppendPipeline::id()
 {
-    ASSERT(WTF::isMainThread());
+    ASSERT(isMainThread());
 
     if (m_id)
         return m_id;
@@ -424,7 +376,7 @@ gint AppendPipeline::id()
 
 void AppendPipeline::setAppendState(AppendState newAppendState)
 {
-    ASSERT(WTF::isMainThread());
+    ASSERT(isMainThread());
     // Valid transitions:
     // NotStarted-->Ongoing-->DataStarve-->NotStarted
     //           |         |            `->Aborting-->NotStarted
@@ -564,7 +516,7 @@ void AppendPipeline::setAppendState(AppendState newAppendState)
 
 void AppendPipeline::parseDemuxerSrcPadCaps(GstCaps* demuxerSrcPadCaps)
 {
-    ASSERT(WTF::isMainThread());
+    ASSERT(isMainThread());
 
     m_demuxerSrcPadCaps = adoptGRef(demuxerSrcPadCaps);
     m_streamType = WebCore::MediaSourceStreamTypeGStreamer::Unknown;
@@ -592,7 +544,7 @@ void AppendPipeline::parseDemuxerSrcPadCaps(GstCaps* demuxerSrcPadCaps)
 
 void AppendPipeline::appsinkCapsChanged()
 {
-    ASSERT(WTF::isMainThread());
+    ASSERT(isMainThread());
 
     if (!m_appsink)
         return;
@@ -616,7 +568,7 @@ void AppendPipeline::appsinkCapsChanged()
 
 void AppendPipeline::handleEndOfAppend()
 {
-    ASSERT(WTF::isMainThread());
+    ASSERT(isMainThread());
     GST_TRACE_OBJECT(m_pipeline.get(), "received end-of-append");
 
     // Regardless of the state transition, the result is the same: didReceiveAllPendingSamples() is called.
@@ -637,7 +589,7 @@ void AppendPipeline::handleEndOfAppend()
 
 void AppendPipeline::appsinkNewSample(GRefPtr<GstSample>&& sample)
 {
-    ASSERT(WTF::isMainThread());
+    ASSERT(isMainThread());
 
     if (UNLIKELY(!gst_sample_get_buffer(sample.get()))) {
         GST_WARNING("Received sample without buffer from appsink.");
@@ -673,7 +625,7 @@ void AppendPipeline::appsinkNewSample(GRefPtr<GstSample>&& sample)
 
 void AppendPipeline::appsinkEOS()
 {
-    ASSERT(WTF::isMainThread());
+    ASSERT(isMainThread());
 
     switch (m_appendState) {
     case AppendState::Aborting:
@@ -694,7 +646,7 @@ void AppendPipeline::appsinkEOS()
 
 void AppendPipeline::didReceiveInitializationSegment()
 {
-    ASSERT(WTF::isMainThread());
+    ASSERT(isMainThread());
 
     WebCore::SourceBufferPrivateClient::InitializationSegment initializationSegment;
 
@@ -726,7 +678,7 @@ void AppendPipeline::didReceiveInitializationSegment()
 
 AtomicString AppendPipeline::trackId()
 {
-    ASSERT(WTF::isMainThread());
+    ASSERT(isMainThread());
 
     if (!m_track)
         return AtomicString();
@@ -736,7 +688,7 @@ AtomicString AppendPipeline::trackId()
 
 void AppendPipeline::consumeAppsinkAvailableSamples()
 {
-    ASSERT(WTF::isMainThread());
+    ASSERT(isMainThread());
 
     GRefPtr<GstSample> sample;
     int batchedSampleCount = 0;
@@ -750,7 +702,7 @@ void AppendPipeline::consumeAppsinkAvailableSamples()
 
 void AppendPipeline::resetPipeline()
 {
-    ASSERT(WTF::isMainThread());
+    ASSERT(isMainThread());
     GST_DEBUG("resetting pipeline");
 
     gst_element_set_state(m_pipeline.get(), GST_STATE_READY);
@@ -769,7 +721,7 @@ void AppendPipeline::resetPipeline()
 
 void AppendPipeline::abort()
 {
-    ASSERT(WTF::isMainThread());
+    ASSERT(isMainThread());
     GST_DEBUG("aborting");
 
     m_pendingBuffer = nullptr;
@@ -818,9 +770,9 @@ GstFlowReturn AppendPipeline::pushNewBuffer(GstBuffer* buffer)
     return GST_FLOW_OK;
 }
 
-GstFlowReturn AppendPipeline::handleNewAppsinkSample(GstElement* appsink)
+GstFlowReturn AppendPipeline::handleAppsinkNewSampleFromStreamingThread(GstElement*)
 {
-    ASSERT(!WTF::isMainThread());
+    ASSERT(!isMainThread());
     if (&WTF::Thread::current() != m_streamingThread) {
         // m_streamingThreadId has been initialized in appsrcEndOfAppendCheckerProbe().
         // For a buffer to reach the appsink, a buffer must have passed through appsrcEndOfAppendCheckerProbe() first.
@@ -838,10 +790,11 @@ GstFlowReturn AppendPipeline::handleNewAppsinkSample(GstElement* appsink)
     }
 
     if (!m_wasBusAlreadyNotifiedOfAvailableSamples.test_and_set()) {
-        GstStructure* structure = gst_structure_new_empty("appsink-new-sample");
-        GstMessage* message = gst_message_new_application(GST_OBJECT(appsink), structure);
-        gst_bus_post(m_bus.get(), message);
-        GST_TRACE("appsink-new-sample message posted to bus");
+        GST_TRACE("Posting appsink-new-sample task to the main thread");
+        m_taskQueue.enqueueTask([this]() {
+            m_wasBusAlreadyNotifiedOfAvailableSamples.clear();
+            consumeAppsinkAvailableSamples();
+        });
     }
 
     return GST_FLOW_OK;
@@ -879,8 +832,9 @@ createOptionalParserForFormat(GstPad* demuxerSrcPad)
     return nullptr;
 }
 
-void AppendPipeline::connectDemuxerSrcPadToAppsinkFromAnyThread(GstPad* demuxerSrcPad)
+void AppendPipeline::connectDemuxerSrcPadToAppsinkFromStreamingThread(GstPad* demuxerSrcPad)
 {
+    ASSERT(!isMainThread());
     if (!m_appsink)
         return;
 
@@ -905,23 +859,14 @@ void AppendPipeline::connectDemuxerSrcPadToAppsinkFromAnyThread(GstPad* demuxerS
     else
         m_initialDuration = MediaTime::positiveInfiniteTime();
 
-    if (WTF::isMainThread())
+    GST_DEBUG("Requesting demuxer-connect-to-appsink to main thread");
+    auto response = m_taskQueue.enqueueTaskAndWait<AbortableTaskQueue::Void>([this, demuxerSrcPad]() {
         connectDemuxerSrcPadToAppsink(demuxerSrcPad);
-    else {
-        // Call connectDemuxerSrcPadToAppsink() in the main thread and wait.
-        LockHolder locker(m_padAddRemoveLock);
-        if (!m_playerPrivate)
-            return;
-
-        GstStructure* structure = gst_structure_new("demuxer-connect-to-appsink", "demuxer-src-pad", G_TYPE_OBJECT, demuxerSrcPad, nullptr);
-        GstMessage* message = gst_message_new_application(GST_OBJECT(m_demux.get()), structure);
-        gst_bus_post(m_bus.get(), message);
-        GST_TRACE("demuxer-connect-to-appsink message posted to bus");
-
-        m_padAddRemoveCondition.wait(m_padAddRemoveLock);
-
-        if (!m_playerPrivate)
-            return;
+        return AbortableTaskQueue::Void();
+    });
+    if (!response) {
+        // The AppendPipeline has been destroyed or aborted before we received a response.
+        return;
     }
 
     // Must be done in the thread we were called from (usually streaming thread).
@@ -966,14 +911,13 @@ void AppendPipeline::connectDemuxerSrcPadToAppsinkFromAnyThread(GstPad* demuxerS
 
 void AppendPipeline::connectDemuxerSrcPadToAppsink(GstPad* demuxerSrcPad)
 {
-    ASSERT(WTF::isMainThread());
+    ASSERT(isMainThread());
     GST_DEBUG("Connecting to appsink");
 
     const String& type = m_sourceBufferPrivate->type().containerType();
     if (type.endsWith("webm"))
         gst_pad_add_probe(demuxerSrcPad, GST_PAD_PROBE_TYPE_EVENT_DOWNSTREAM, matroskademuxForceSegmentStartToEqualZero, nullptr, nullptr);
 
-    LockHolder locker(m_padAddRemoveLock);
     GRefPtr<GstPad> sinkSinkPad = adoptGRef(gst_element_get_static_pad(m_appsink.get(), "sink"));
 
     // Only one stream per demuxer is supported.
@@ -982,7 +926,6 @@ void AppendPipeline::connectDemuxerSrcPadToAppsink(GstPad* demuxerSrcPad)
     GRefPtr<GstCaps> caps = adoptGRef(gst_pad_get_current_caps(GST_PAD(demuxerSrcPad)));
 
     if (!caps || m_appendState == AppendState::Invalid || !m_playerPrivate) {
-        m_padAddRemoveCondition.notifyOne();
         return;
     }
 
@@ -1018,8 +961,6 @@ void AppendPipeline::connectDemuxerSrcPadToAppsink(GstPad* demuxerSrcPad)
         // This is going to cause an error which will detach the SourceBuffer and tear down this
         // AppendPipeline, so we need the padAddRemove lock released before continuing.
         m_track = nullptr;
-        m_padAddRemoveCondition.notifyOne();
-        locker.unlockEarly();
         didReceiveInitializationSegment();
         return;
     default:
@@ -1030,12 +971,13 @@ void AppendPipeline::connectDemuxerSrcPadToAppsink(GstPad* demuxerSrcPad)
     m_appsinkCaps = WTFMove(caps);
     if (m_playerPrivate)
         m_playerPrivate->trackDetected(this, m_track, true);
-
-    m_padAddRemoveCondition.notifyOne();
 }
 
 void AppendPipeline::disconnectDemuxerSrcPadFromAppsinkFromAnyThread(GstPad*)
 {
+    // Note: This function can be called either from the streaming thread (e.g. if a strange initialization segment with
+    // incompatible tracks is appended and the srcpad disconnected) or -- more usually -- from the main thread, when
+    // a state change is made to bring the demuxer down. (State change operations run in the main thread.)
     GST_DEBUG_BIN_TO_DOT_FILE_WITH_TS(GST_BIN(m_pipeline.get()), GST_DEBUG_GRAPH_SHOW_ALL, "pad-removed-before");
 
     GST_DEBUG("Disconnecting appsink");
@@ -1049,23 +991,6 @@ void AppendPipeline::disconnectDemuxerSrcPadFromAppsinkFromAnyThread(GstPad*)
     GST_DEBUG_BIN_TO_DOT_FILE_WITH_TS(GST_BIN(m_pipeline.get()), GST_DEBUG_GRAPH_SHOW_ALL, "pad-removed-after");
 }
 
-void AppendPipeline::appendPipelineDemuxerNoMorePadsFromAnyThread()
-{
-    GST_TRACE("appendPipelineDemuxerNoMorePadsFromAnyThread");
-    GstStructure* structure = gst_structure_new_empty("demuxer-no-more-pads");
-    GstMessage* message = gst_message_new_application(GST_OBJECT(m_appsrc.get()), structure);
-    gst_bus_post(m_bus.get(), message);
-    GST_TRACE("appendPipelineDemuxerNoMorePadsFromAnyThread - posted to bus");
-}
-
-static void appendPipelineAppsinkCapsChanged(GObject* appsinkPad, GParamSpec*, AppendPipeline* appendPipeline)
-{
-    GstStructure* structure = gst_structure_new_empty("appsink-caps-changed");
-    GstMessage* message = gst_message_new_application(GST_OBJECT(appsinkPad), structure);
-    gst_bus_post(appendPipeline->bus(), message);
-    GST_TRACE("appsink-caps-changed message posted to bus");
-}
-
 #if !LOG_DISABLED
 static GstPadProbeReturn appendPipelinePadProbeDebugInformation(GstPad*, GstPadProbeInfo* info, struct PadProbeInformation* padProbeInformation)
 {
@@ -1105,40 +1030,6 @@ static GstPadProbeReturn appendPipelineDemuxerBlackHolePadProbe(GstPad*, GstPadP
     return GST_PAD_PROBE_DROP;
 }
 
-static void appendPipelineDemuxerPadAdded(GstElement*, GstPad* demuxerSrcPad, AppendPipeline* appendPipeline)
-{
-    appendPipeline->connectDemuxerSrcPadToAppsinkFromAnyThread(demuxerSrcPad);
-}
-
-static void appendPipelineDemuxerPadRemoved(GstElement*, GstPad* demuxerSrcPad, AppendPipeline* appendPipeline)
-{
-    appendPipeline->disconnectDemuxerSrcPadFromAppsinkFromAnyThread(demuxerSrcPad);
-}
-
-static void appendPipelineDemuxerNoMorePads(GstElement*, AppendPipeline* appendPipeline)
-{
-    appendPipeline->appendPipelineDemuxerNoMorePadsFromAnyThread();
-}
-
-static GstFlowReturn appendPipelineAppsinkNewSample(GstElement* appsink, AppendPipeline* appendPipeline)
-{
-    return appendPipeline->handleNewAppsinkSample(appsink);
-}
-
-static void appendPipelineAppsinkEOS(GstElement*, AppendPipeline* appendPipeline)
-{
-    if (WTF::isMainThread())
-        appendPipeline->appsinkEOS();
-    else {
-        GstStructure* structure = gst_structure_new_empty("appsink-eos");
-        GstMessage* message = gst_message_new_application(GST_OBJECT(appendPipeline->appsink()), structure);
-        gst_bus_post(appendPipeline->bus(), message);
-        GST_TRACE("appsink-eos message posted to bus");
-    }
-
-    GST_DEBUG("%s main thread", (WTF::isMainThread()) ? "Is" : "Not");
-}
-
 static GstPadProbeReturn matroskademuxForceSegmentStartToEqualZero(GstPad*, GstPadProbeInfo* info, void*)
 {
     // matroskademux sets GstSegment.start to the PTS of the first frame.