[MSE][GStreamer] Introduce AbortableTaskQueue
authoraboya@igalia.com <aboya@igalia.com@268f45cc-cd09-0410-ab3c-d52691b4dbfc>
Mon, 12 Nov 2018 14:40:26 +0000 (14:40 +0000)
committeraboya@igalia.com <aboya@igalia.com@268f45cc-cd09-0410-ab3c-d52691b4dbfc>
Mon, 12 Nov 2018 14:40:26 +0000 (14:40 +0000)
https://bugs.webkit.org/show_bug.cgi?id=190902

Reviewed by Xabier Rodriguez-Calvar.

Source/WebCore:

A new synchronization primitive is introduced: AbortableTaskQueue,
which allows to send work to the main thread from a background thread
with the option to perform two-phase cancellation (startAborting() and
finishAborting()).

This new primitive has been used to overhaul GstBus messaging in
AppendPipeline. A lot of code made redundant has been deleted in the
process and lots of internal functions were now able to be made
private. As part of the refactor all glib signals in AppendPipeline
now use lambdas. All usages of WTF::isMainThread() in AppendPipeline
have been replaced by isMainThread() for consistency with the rest of
WebKit.

Two-phase cancellation is still not used in AppendPipeline as of this
patch, but it will be used in a future patch that makes use of
GStreamer flushes to implement correct MSE abort semantics. There are
unit tests to ensure it works correctly, even if it's still not used.

* WebCore.xcodeproj/project.pbxproj:
* platform/AbortableTaskQueue.h: Added.
* platform/graphics/gstreamer/mse/AppendPipeline.cpp:
(WebCore::AppendPipeline::dumpAppendState):
(WebCore::AppendPipeline::AppendPipeline):
(WebCore::AppendPipeline::~AppendPipeline):
(WebCore::AppendPipeline::appsrcEndOfAppendCheckerProbe):
(WebCore::AppendPipeline::handleAppsinkNewSampleFromAnyThread):
(WebCore::AppendPipeline::connectDemuxerSrcPadToAppsinkFromAnyThread):
* platform/graphics/gstreamer/mse/AppendPipeline.h:
(WebCore::AppendPipeline::sourceBufferPrivate):
(WebCore::AppendPipeline::appsinkCaps):
(WebCore::AppendPipeline::track):
(WebCore::AppendPipeline::demuxerSrcPadCaps):
(WebCore::AppendPipeline::playerPrivate):

Tools:

Tests for AbortableTaskQueue are included.

* TestWebKitAPI/TestWebKitAPI.xcodeproj/project.pbxproj:
* TestWebKitAPI/PlatformGTK.cmake:
* TestWebKitAPI/Tests/WebCore/AbortableTaskQueue.cpp: Added.
(TestWebKitAPI::TEST):
(TestWebKitAPI::FancyResponse::FancyResponse):
(TestWebKitAPI::FancyResponse::operator=):
(TestWebKitAPI::DeterministicScheduler::DeterministicScheduler):
(TestWebKitAPI::DeterministicScheduler::ThreadContext::ThreadContext):
(TestWebKitAPI::DeterministicScheduler::ThreadContext::waitMyTurn):
(TestWebKitAPI::DeterministicScheduler::ThreadContext::yieldToThread):

git-svn-id: https://svn.webkit.org/repository/webkit/trunk@238084 268f45cc-cd09-0410-ab3c-d52691b4dbfc

Source/WebCore/ChangeLog
Source/WebCore/WebCore.xcodeproj/project.pbxproj
Source/WebCore/platform/AbortableTaskQueue.h [new file with mode: 0644]
Source/WebCore/platform/graphics/gstreamer/mse/AppendPipeline.cpp
Source/WebCore/platform/graphics/gstreamer/mse/AppendPipeline.h
Tools/ChangeLog
Tools/TestWebKitAPI/PlatformGTK.cmake
Tools/TestWebKitAPI/TestWebKitAPI.xcodeproj/project.pbxproj
Tools/TestWebKitAPI/Tests/WebCore/AbortableTaskQueue.cpp [new file with mode: 0644]

index f505136..1c9f37c 100644 (file)
@@ -1,3 +1,44 @@
+2018-11-12  Alicia Boya García  <aboya@igalia.com>
+
+        [MSE][GStreamer] Introduce AbortableTaskQueue
+        https://bugs.webkit.org/show_bug.cgi?id=190902
+
+        Reviewed by Xabier Rodriguez-Calvar.
+
+        A new synchronization primitive is introduced: AbortableTaskQueue,
+        which allows to send work to the main thread from a background thread
+        with the option to perform two-phase cancellation (startAborting() and
+        finishAborting()).
+
+        This new primitive has been used to overhaul GstBus messaging in
+        AppendPipeline. A lot of code made redundant has been deleted in the
+        process and lots of internal functions were now able to be made
+        private. As part of the refactor all glib signals in AppendPipeline
+        now use lambdas. All usages of WTF::isMainThread() in AppendPipeline
+        have been replaced by isMainThread() for consistency with the rest of
+        WebKit.
+
+        Two-phase cancellation is still not used in AppendPipeline as of this
+        patch, but it will be used in a future patch that makes use of
+        GStreamer flushes to implement correct MSE abort semantics. There are
+        unit tests to ensure it works correctly, even if it's still not used.
+
+        * WebCore.xcodeproj/project.pbxproj:
+        * platform/AbortableTaskQueue.h: Added.
+        * platform/graphics/gstreamer/mse/AppendPipeline.cpp:
+        (WebCore::AppendPipeline::dumpAppendState):
+        (WebCore::AppendPipeline::AppendPipeline):
+        (WebCore::AppendPipeline::~AppendPipeline):
+        (WebCore::AppendPipeline::appsrcEndOfAppendCheckerProbe):
+        (WebCore::AppendPipeline::handleAppsinkNewSampleFromAnyThread):
+        (WebCore::AppendPipeline::connectDemuxerSrcPadToAppsinkFromAnyThread):
+        * platform/graphics/gstreamer/mse/AppendPipeline.h:
+        (WebCore::AppendPipeline::sourceBufferPrivate):
+        (WebCore::AppendPipeline::appsinkCaps):
+        (WebCore::AppendPipeline::track):
+        (WebCore::AppendPipeline::demuxerSrcPadCaps):
+        (WebCore::AppendPipeline::playerPrivate):
+
 2018-11-12  Xabier Rodriguez Calvar  <calvaris@igalia.com>
 
         [GStreamer][EME] waitingforkey event should consider decryptors' waiting status
index 2ea48f1..79495d7 100644 (file)
                46B9519A207D635400A7D2DD /* RemoteFrame.h in Headers */ = {isa = PBXBuildFile; fileRef = 46B95192207D632E00A7D2DD /* RemoteFrame.h */; settings = {ATTRIBUTES = (Private, ); }; };
                46BCBBC22085008F00710638 /* JSRemoteDOMWindowBase.h in Headers */ = {isa = PBXBuildFile; fileRef = 46BCBBC02085007F00710638 /* JSRemoteDOMWindowBase.h */; settings = {ATTRIBUTES = (Private, ); }; };
                46C376622085177D00C73829 /* JSRemoteDOMWindow.h in Headers */ = {isa = PBXBuildFile; fileRef = 46C376612085176D00C73829 /* JSRemoteDOMWindow.h */; };
+               CD5F3EDD9D333C40A9A38A54 /* AbortableTaskQueue.h in Headers */ = {isa = PBXBuildFile; fileRef = DFDB912CF8E88A6DA1AD264F /* AbortableTaskQueue.h */; settings = {ATTRIBUTES = (Private, ); }; };
                46C696CB1E7205F700597937 /* CPUMonitor.h in Headers */ = {isa = PBXBuildFile; fileRef = 46C696C91E7205E400597937 /* CPUMonitor.h */; settings = {ATTRIBUTES = (Private, ); }; };
                46C696CC1E7205FC00597937 /* CPUMonitor.cpp in Sources */ = {isa = PBXBuildFile; fileRef = 46C696CA1E7205E400597937 /* CPUMonitor.cpp */; };
                46C83EFE1A9BBE2900A79A41 /* GeoNotifier.h in Headers */ = {isa = PBXBuildFile; fileRef = 46C83EFC1A9BBE2900A79A41 /* GeoNotifier.h */; settings = {ATTRIBUTES = (Private, ); }; };
                46BCBBC3208500A700710638 /* RemoteDOMWindow.idl */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = text; path = RemoteDOMWindow.idl; sourceTree = "<group>"; };
                46C3765F2085176C00C73829 /* JSRemoteDOMWindow.cpp */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.cpp.cpp; path = JSRemoteDOMWindow.cpp; sourceTree = "<group>"; };
                46C376612085176D00C73829 /* JSRemoteDOMWindow.h */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.c.h; path = JSRemoteDOMWindow.h; sourceTree = "<group>"; };
+               DFDB912CF8E88A6DA1AD264F /* AbortableTaskQueue.h */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.c.h; path = AbortableTaskQueue.h; sourceTree = "<group>"; };
                46C696C91E7205E400597937 /* CPUMonitor.h */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.c.h; path = CPUMonitor.h; sourceTree = "<group>"; };
                46C696CA1E7205E400597937 /* CPUMonitor.cpp */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.cpp.cpp; path = CPUMonitor.cpp; sourceTree = "<group>"; };
                46C83EFB1A9BBE2900A79A41 /* GeoNotifier.cpp */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.cpp.cpp; path = GeoNotifier.cpp; sourceTree = "<group>"; };
                                1A2E6E560CC551E0004A2062 /* sql */,
                                B2C3D9EC0D006C1D00EF6F26 /* text */,
                                E188235F2031F50F00B42DF3 /* vr */,
+                               DFDB912CF8E88A6DA1AD264F /* AbortableTaskQueue.h */,
                                49AE2D94134EE5F90072920A /* CalculationValue.cpp */,
                                49AE2D95134EE5F90072920A /* CalculationValue.h */,
                                C330A22113EC196B0000B45B /* ColorChooser.h */,
diff --git a/Source/WebCore/platform/AbortableTaskQueue.h b/Source/WebCore/platform/AbortableTaskQueue.h
new file mode 100644 (file)
index 0000000..78a37f8
--- /dev/null
@@ -0,0 +1,235 @@
+/*
+ * Copyright (C) 2018 Igalia, S.L.
+ * Copyright (C) 2018 Metrological Group B.V.
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Library General Public
+ * License as published by the Free Software Foundation; either
+ * version 2 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Library General Public License for more details.
+ *
+ * You should have received a copy of the GNU Library General Public License
+ * aint with this library; see the file COPYING.LIB.  If not, write to
+ * the Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
+ * Boston, MA 02110-1301, USA.
+ */
+
+#pragma once
+
+#include <wtf/Condition.h>
+#include <wtf/Deque.h>
+#include <wtf/Function.h>
+#include <wtf/Lock.h>
+#include <wtf/RunLoop.h>
+#include <wtf/StdLibExtras.h>
+
+namespace WebCore {
+
+/* AbortableTaskQueue is a high-level synchronization object for cases where abortable work is done in
+ * background thread(s) that sometimes needs to post tasks to the main thread.
+ *
+ * The tasks posted by the background thread(s) to the main thread may be asynchronous, using enqueueTask(),
+ * which returns immediately; or synchronous, using enqueueTaskAndWait(), which blocks the calling
+ * background thread until the task is run by the main thread (possibly returning a value).
+ *
+ * What makes AbortableTaskQueue different from other task queueing mechanisms is that it provides a two-phase
+ * protocol for aborting the work in the background thread in presence of queued tasks without deadlocks or
+ * late notification bugs.
+ *
+ * Without a two-phase design deadlocks would occur when attempting an abort if a background thread was
+ * blocked in a synchronous task and needed to return from there for the abort to be handled. Also, without
+ * a design like this, tasks already enqueued at that point or soon thereafter until the abort is complete
+ * would still be handled by the main thread, even though we don't want to anymore.
+ *
+ * Aborting background processing with AbortableTaskQueue is a several step process:
+ *
+ *  1. Call abortableTaskQueue.startAborting() -- This will make any current or future (until further notice)
+ *     synchronous tasks fail immediately, so that we don't deadlock in the next step. Also, tasks of any kind
+ *     already enqueued will not be run.
+ *
+ *  2. Send the abort signal to the background threads. This is completely application specific. For instance,
+ *     in the AppendPipeline case you would flush or reset the GStreamer pipeline here. Wait until all the
+ *     background threads have finished aborting.
+ *
+ *  3. Call abortableTaskQueue.finishAborting() -- This will allow new tasks queued from this point on to be
+ *     handled just as before the abort was made.
+ *
+ *  4. After this, the background thread(s) can be put to work again safely.
+ *
+ * This class is used for handling demuxer events in AppendPipeline, taking into account demuxing can be
+ * aborted at any moment if SourceBuffer.abort() is called or the SourceBuffer is destroyed. */
+class AbortableTaskQueue final {
+    WTF_MAKE_NONCOPYABLE(AbortableTaskQueue);
+public:
+    AbortableTaskQueue()
+    {
+        ASSERT(isMainThread());
+    }
+
+    ~AbortableTaskQueue()
+    {
+        ASSERT(isMainThread());
+        ASSERT(!m_mutex.isHeld());
+        ASSERT(m_channel.isEmpty());
+    }
+
+    // ===========================
+    // Methods for the main thread
+    // ===========================
+
+    // Starts an abort process.
+    //
+    // Tasks already queued will be discarded.
+    //
+    // Until finishAborting is called, all present and future calls to enqueueTaskAndWait() will immediately
+    // return an empty optional.
+    //
+    // This method is idempotent.
+    void startAborting()
+    {
+        ASSERT(isMainThread());
+
+        {
+            LockHolder lockHolder(m_mutex);
+            m_aborting = true;
+            cancelAllTasks();
+        }
+        m_abortedOrResponseSet.notifyAll();
+    }
+
+    // Declares the previous abort finished.
+    //
+    // In order to avoid race conditions the background threads must be unable to post tasks at this point.
+    void finishAborting()
+    {
+        ASSERT(isMainThread());
+
+        LockHolder lockHolder(m_mutex);
+        ASSERT(m_aborting);
+        m_aborting = false;
+    }
+
+    // ==================================
+    // Methods for the background threads
+    // ==================================
+
+    // Enqueue a task to be run on the main thread. The task may be cancelled if an abort starts before it's
+    // handled.
+    void enqueueTask(WTF::Function<void()>&& mainThreadTaskHandler)
+    {
+        ASSERT(!isMainThread());
+
+        LockHolder lockHolder(m_mutex);
+        if (m_aborting)
+            return;
+
+        postTask(WTFMove(mainThreadTaskHandler));
+    }
+
+    // Enqueue a task to be run on the main thread and wait for it to return. The return value of the task is
+    // forwarded to the background thread, wrapped in an optional.
+    //
+    // If we are aborting, the call finishes immediately, returning an empty optional.
+    template<typename R>
+    std::optional<R> enqueueTaskAndWait(WTF::Function<R()>&& mainThreadTaskHandler)
+    {
+        // Don't deadlock the main thread with itself.
+        ASSERT(!isMainThread());
+
+        LockHolder lockHolder(m_mutex);
+        if (m_aborting)
+            return std::nullopt;
+
+        std::optional<R> response = std::nullopt;
+        postTask([this, &response, &mainThreadTaskHandler]() {
+            R responseValue = mainThreadTaskHandler();
+            LockHolder lockHolder(m_mutex);
+            response = WTFMove(responseValue);
+            m_abortedOrResponseSet.notifyAll();
+        });
+        m_abortedOrResponseSet.wait(m_mutex, [this, &response]() {
+            return m_aborting || response;
+        });
+        return response;
+    }
+
+    // This is class is provided for convenience when you want to use enqueueTaskAndWait() but
+    // you don't need any particular data from the main thread in return and just knowing that it finished
+    // running the handler function is enough.
+    class Void { };
+
+private:
+    // Protected state:
+    //   Main thread: read-write. Writes must be made with the lock.
+    //   Background threads: read only. Reads must be made with the lock.
+    class Task : public ThreadSafeRefCounted<Task> {
+        WTF_MAKE_NONCOPYABLE(Task);
+        WTF_MAKE_FAST_ALLOCATED(Task);
+    public:
+        static Ref<Task> create(AbortableTaskQueue* taskQueue, WTF::Function<void()>&& taskCallback)
+        {
+            return adoptRef(*new Task(taskQueue, WTFMove(taskCallback)));
+        }
+
+        bool isCancelled() const
+        {
+            return !m_taskQueue;
+        }
+
+        void cancel()
+        {
+            ASSERT(!isCancelled());
+            m_taskCallback = nullptr;
+            m_taskQueue = nullptr;
+        }
+
+        void dispatch()
+        {
+            ASSERT(isMainThread());
+            if (isCancelled())
+                return;
+
+            LockHolder lock(m_taskQueue->m_mutex);
+            ASSERT(this == m_taskQueue->m_channel.first().ptr());
+            m_taskQueue->m_channel.removeFirst();
+            lock.unlockEarly();
+            m_taskCallback();
+        }
+
+    private:
+        AbortableTaskQueue* m_taskQueue;
+        WTF::Function<void()> m_taskCallback;
+
+        Task(AbortableTaskQueue* taskQueue, WTF::Function<void()>&& taskCallback)
+            : m_taskQueue(taskQueue), m_taskCallback(WTFMove(taskCallback))
+        { }
+    };
+
+    void postTask(WTF::Function<void()>&& callback)
+    {
+        ASSERT(m_mutex.isHeld());
+        Ref<Task> task = Task::create(this, WTFMove(callback));
+        m_channel.append(task.copyRef());
+        RunLoop::main().dispatch([task = WTFMove(task)]() { task->dispatch(); });
+    }
+
+    void cancelAllTasks()
+    {
+        ASSERT(isMainThread());
+        ASSERT(m_mutex.isHeld());
+        for (Ref<Task>& task : m_channel)
+            task->cancel();
+        m_channel.clear();
+    }
+
+    bool m_aborting { false };
+    Lock m_mutex;
+    Condition m_abortedOrResponseSet;
+    WTF::Deque<Ref<Task>> m_channel;
+};
+
+} // namespace WebCore
index 839da62..1b7b4a2 100644 (file)
@@ -32,6 +32,7 @@
 #include "MediaDescription.h"
 #include "SourceBufferPrivateGStreamer.h"
 #include "VideoTrackPrivateGStreamer.h"
+#include <functional>
 #include <gst/app/gstappsink.h>
 #include <gst/app/gstappsrc.h>
 #include <gst/gst.h>
@@ -59,14 +60,14 @@ struct EndOfAppendMeta {
 
 void AppendPipeline::staticInitialization()
 {
-    ASSERT(WTF::isMainThread());
+    ASSERT(isMainThread());
 
     const char* tags[] = { nullptr };
     s_endOfAppendMetaType = gst_meta_api_type_register("WebKitEndOfAppendMetaAPI", tags);
     s_webKitEndOfAppendMetaInfo = gst_meta_register(s_endOfAppendMetaType, "WebKitEndOfAppendMeta", sizeof(EndOfAppendMeta), EndOfAppendMeta::init, EndOfAppendMeta::free, EndOfAppendMeta::transform);
 }
 
-static const char* dumpAppendState(AppendPipeline::AppendState appendState)
+const char* AppendPipeline::dumpAppendState(AppendPipeline::AppendState appendState)
 {
     switch (appendState) {
     case AppendPipeline::AppendState::Invalid:
@@ -88,9 +89,6 @@ static const char* dumpAppendState(AppendPipeline::AppendState appendState)
     }
 }
 
-static void appendPipelineDemuxerPadAdded(GstElement*, GstPad*, AppendPipeline*);
-static void appendPipelineDemuxerPadRemoved(GstElement*, GstPad*, AppendPipeline*);
-static void appendPipelineAppsinkCapsChanged(GObject*, GParamSpec*, AppendPipeline*);
 #if !LOG_DISABLED
 static GstPadProbeReturn appendPipelinePadProbeDebugInformation(GstPad*, GstPadProbeInfo*, struct PadProbeInformation*);
 #endif
@@ -100,28 +98,9 @@ static GstPadProbeReturn appendPipelineAppsinkPadEventProbe(GstPad*, GstPadProbe
 #endif
 
 static GstPadProbeReturn appendPipelineDemuxerBlackHolePadProbe(GstPad*, GstPadProbeInfo*, gpointer);
-static GstFlowReturn appendPipelineAppsinkNewSample(GstElement*, AppendPipeline*);
-static void appendPipelineAppsinkEOS(GstElement*, AppendPipeline*);
-static void appendPipelineDemuxerNoMorePads(GstElement*, AppendPipeline*);
 
 static GstPadProbeReturn matroskademuxForceSegmentStartToEqualZero(GstPad*, GstPadProbeInfo*, void*);
 
-static void appendPipelineNeedContextMessageCallback(GstBus*, GstMessage* message, AppendPipeline* appendPipeline)
-{
-    GST_TRACE("received callback");
-    appendPipeline->handleNeedContextSyncMessage(message);
-}
-
-static void appendPipelineApplicationMessageCallback(GstBus*, GstMessage* message, AppendPipeline* appendPipeline)
-{
-    appendPipeline->handleApplicationMessage(message);
-}
-
-static void appendPipelineStateChangeMessageCallback(GstBus*, GstMessage* message, AppendPipeline* appendPipeline)
-{
-    appendPipeline->handleStateChangeMessage(message);
-}
-
 AppendPipeline::AppendPipeline(Ref<MediaSourceClientGStreamerMSE> mediaSourceClient, Ref<SourceBufferPrivateGStreamer> sourceBufferPrivate, MediaPlayerPrivateGStreamerMSE& playerPrivate)
     : m_mediaSourceClient(mediaSourceClient.get())
     , m_sourceBufferPrivate(sourceBufferPrivate.get())
@@ -132,7 +111,7 @@ AppendPipeline::AppendPipeline(Ref<MediaSourceClientGStreamerMSE> mediaSourceCli
     , m_abortPending(false)
     , m_streamType(Unknown)
 {
-    ASSERT(WTF::isMainThread());
+    ASSERT(isMainThread());
     std::call_once(s_staticInitializationFlag, AppendPipeline::staticInitialization);
 
     GST_TRACE("Creating AppendPipeline (%p)", this);
@@ -148,9 +127,12 @@ AppendPipeline::AppendPipeline(Ref<MediaSourceClientGStreamerMSE> mediaSourceCli
     gst_bus_add_signal_watch_full(m_bus.get(), RunLoopSourcePriority::RunLoopDispatcher);
     gst_bus_enable_sync_message_emission(m_bus.get());
 
-    g_signal_connect(m_bus.get(), "sync-message::need-context", G_CALLBACK(appendPipelineNeedContextMessageCallback), this);
-    g_signal_connect(m_bus.get(), "message::application", G_CALLBACK(appendPipelineApplicationMessageCallback), this);
-    g_signal_connect(m_bus.get(), "message::state-changed", G_CALLBACK(appendPipelineStateChangeMessageCallback), this);
+    g_signal_connect(m_bus.get(), "sync-message::need-context", G_CALLBACK(+[](GstBus*, GstMessage* message, AppendPipeline* appendPipeline) {
+        appendPipeline->handleNeedContextSyncMessage(message);
+    }), this);
+    g_signal_connect(m_bus.get(), "message::state-changed", G_CALLBACK(+[](GstBus*, GstMessage* message, AppendPipeline* appendPipeline) {
+        appendPipeline->handleStateChangeMessage(message);
+    }), this);
 
     // We assign the created instances here instead of adoptRef() because gst_bin_add_many()
     // below will already take the initial reference and we need an additional one for us.
@@ -176,7 +158,11 @@ AppendPipeline::AppendPipeline(Ref<MediaSourceClientGStreamerMSE> mediaSourceCli
     gst_base_sink_set_last_sample_enabled(GST_BASE_SINK(m_appsink.get()), FALSE);
 
     GRefPtr<GstPad> appsinkPad = adoptGRef(gst_element_get_static_pad(m_appsink.get(), "sink"));
-    g_signal_connect(appsinkPad.get(), "notify::caps", G_CALLBACK(appendPipelineAppsinkCapsChanged), this);
+    g_signal_connect(appsinkPad.get(), "notify::caps", G_CALLBACK(+[](GObject*, GParamSpec*, AppendPipeline* appendPipeline) {
+        appendPipeline->m_taskQueue.enqueueTask([appendPipeline]() {
+            appendPipeline->appsinkCapsChanged();
+        });
+    }), this);
 
 #if !LOG_DISABLED
     GRefPtr<GstPad> demuxerPad = adoptGRef(gst_element_get_static_pad(m_demux.get(), "sink"));
@@ -195,11 +181,29 @@ AppendPipeline::AppendPipeline(Ref<MediaSourceClientGStreamerMSE> mediaSourceCli
 #endif
 
     // These signals won't be connected outside of the lifetime of "this".
-    g_signal_connect(m_demux.get(), "pad-added", G_CALLBACK(appendPipelineDemuxerPadAdded), this);
-    g_signal_connect(m_demux.get(), "pad-removed", G_CALLBACK(appendPipelineDemuxerPadRemoved), this);
-    g_signal_connect(m_demux.get(), "no-more-pads", G_CALLBACK(appendPipelineDemuxerNoMorePads), this);
-    g_signal_connect(m_appsink.get(), "new-sample", G_CALLBACK(appendPipelineAppsinkNewSample), this);
-    g_signal_connect(m_appsink.get(), "eos", G_CALLBACK(appendPipelineAppsinkEOS), this);
+    g_signal_connect(m_demux.get(), "pad-added", G_CALLBACK(+[](GstElement*, GstPad* demuxerSrcPad, AppendPipeline* appendPipeline) {
+        appendPipeline->connectDemuxerSrcPadToAppsinkFromStreamingThread(demuxerSrcPad);
+    }), this);
+    g_signal_connect(m_demux.get(), "pad-removed", G_CALLBACK(+[](GstElement*, GstPad* demuxerSrcPad, AppendPipeline* appendPipeline) {
+        appendPipeline->disconnectDemuxerSrcPadFromAppsinkFromAnyThread(demuxerSrcPad);
+    }), this);
+    g_signal_connect(m_demux.get(), "no-more-pads", G_CALLBACK(+[](GstElement*, AppendPipeline* appendPipeline) {
+        ASSERT(!isMainThread());
+        GST_DEBUG("Posting no-more-pads task to main thread");
+        appendPipeline->m_taskQueue.enqueueTask([appendPipeline]() {
+            appendPipeline->demuxerNoMorePads();
+        });
+    }), this);
+    g_signal_connect(m_appsink.get(), "new-sample", G_CALLBACK(+[](GstElement* appsink, AppendPipeline* appendPipeline) {
+        appendPipeline->handleAppsinkNewSampleFromStreamingThread(appsink);
+    }), this);
+    g_signal_connect(m_appsink.get(), "eos", G_CALLBACK(+[](GstElement*, AppendPipeline* appendPipeline) {
+        ASSERT(!isMainThread());
+        GST_DEBUG("Posting appsink-eos task to main thread");
+        appendPipeline->m_taskQueue.enqueueTask([appendPipeline]() {
+            appendPipeline->appsinkEOS();
+        });
+    }), this);
 
     // Add_many will take ownership of a reference. That's why we used an assignment before.
     gst_bin_add_many(GST_BIN(m_pipeline.get()), m_appsrc.get(), m_demux.get(), nullptr);
@@ -210,15 +214,11 @@ AppendPipeline::AppendPipeline(Ref<MediaSourceClientGStreamerMSE> mediaSourceCli
 
 AppendPipeline::~AppendPipeline()
 {
-    ASSERT(WTF::isMainThread());
+    ASSERT(isMainThread());
 
     setAppendState(AppendState::Invalid);
-
-    {
-        LockHolder locker(m_padAddRemoveLock);
-        m_playerPrivate = nullptr;
-        m_padAddRemoveCondition.notifyOne();
-    }
+    // Forget all pending tasks and unblock the streaming thread if it was blocked.
+    m_taskQueue.startAborting();
 
     GST_TRACE("Destroying AppendPipeline (%p)", this);
 
@@ -226,7 +226,7 @@ AppendPipeline::~AppendPipeline()
 
     if (m_pipeline) {
         ASSERT(m_bus);
-        g_signal_handlers_disconnect_by_func(m_bus.get(), reinterpret_cast<gpointer>(appendPipelineNeedContextMessageCallback), this);
+        g_signal_handlers_disconnect_by_data(m_bus.get(), this);
         gst_bus_disable_sync_message_emission(m_bus.get());
         gst_bus_remove_signal_watch(m_bus.get());
         gst_element_set_state(m_pipeline.get(), GST_STATE_NULL);
@@ -269,7 +269,7 @@ AppendPipeline::~AppendPipeline()
 
 GstPadProbeReturn AppendPipeline::appsrcEndOfAppendCheckerProbe(GstPadProbeInfo* padProbeInfo)
 {
-    ASSERT(!WTF::isMainThread());
+    ASSERT(!isMainThread());
     m_streamingThread = &WTF::Thread::current();
 
     GstBuffer* buffer = GST_BUFFER(padProbeInfo->data);
@@ -281,27 +281,22 @@ GstPadProbeReturn AppendPipeline::appsrcEndOfAppendCheckerProbe(GstPadProbeInfo*
         return GST_PAD_PROBE_OK;
     }
 
-    GST_TRACE_OBJECT(m_pipeline.get(), "posting end-of-append request to bus");
-    GstStructure* structure = gst_structure_new_empty("end-of-append");
-    GstMessage* message = gst_message_new_application(GST_OBJECT(m_appsrc.get()), structure);
-    gst_bus_post(m_bus.get(), message);
+    GST_TRACE_OBJECT(m_pipeline.get(), "Posting end-of-append task to the main thread");
+    m_taskQueue.enqueueTask([this]() {
+        handleEndOfAppend();
+    });
     return GST_PAD_PROBE_DROP;
 }
 
 void AppendPipeline::clearPlayerPrivate()
 {
-    ASSERT(WTF::isMainThread());
+    ASSERT(isMainThread());
     GST_DEBUG("cleaning private player");
 
     // Make sure that AppendPipeline won't process more data from now on and
     // instruct handleNewSample to abort itself from now on as well.
     setAppendState(AppendState::Invalid);
-
-    {
-        LockHolder locker(m_padAddRemoveLock);
-        m_playerPrivate = nullptr;
-        m_padAddRemoveCondition.notifyOne();
-    }
+    m_taskQueue.startAborting();
 
     // And now that no handleNewSample operations will remain stalled waiting
     // for the main thread, stop the pipeline.
@@ -320,49 +315,6 @@ void AppendPipeline::handleNeedContextSyncMessage(GstMessage* message)
         m_playerPrivate->handleSyncMessage(message);
 }
 
-void AppendPipeline::handleApplicationMessage(GstMessage* message)
-{
-    ASSERT(WTF::isMainThread());
-
-    const GstStructure* structure = gst_message_get_structure(message);
-
-    if (gst_structure_has_name(structure, "demuxer-connect-to-appsink")) {
-        GRefPtr<GstPad> demuxerSrcPad;
-        gst_structure_get(structure, "demuxer-src-pad", G_TYPE_OBJECT, &demuxerSrcPad.outPtr(), nullptr);
-        ASSERT(demuxerSrcPad);
-        connectDemuxerSrcPadToAppsink(demuxerSrcPad.get());
-        return;
-    }
-
-    if (gst_structure_has_name(structure, "appsink-caps-changed")) {
-        appsinkCapsChanged();
-        return;
-    }
-
-    if (gst_structure_has_name(structure, "appsink-new-sample")) {
-        m_wasBusAlreadyNotifiedOfAvailableSamples.clear();
-        consumeAppsinkAvailableSamples();
-        return;
-    }
-
-    if (gst_structure_has_name(structure, "appsink-eos")) {
-        appsinkEOS();
-        return;
-    }
-
-    if (gst_structure_has_name(structure, "demuxer-no-more-pads")) {
-        demuxerNoMorePads();
-        return;
-    }
-
-    if (gst_structure_has_name(structure, "end-of-append")) {
-        handleEndOfAppend();
-        return;
-    }
-
-    ASSERT_NOT_REACHED();
-}
-
 void AppendPipeline::demuxerNoMorePads()
 {
     GST_TRACE("calling didReceiveInitializationSegment");
@@ -373,7 +325,7 @@ void AppendPipeline::demuxerNoMorePads()
 
 void AppendPipeline::handleStateChangeMessage(GstMessage* message)
 {
-    ASSERT(WTF::isMainThread());
+    ASSERT(isMainThread());
 
     if (GST_MESSAGE_SRC(message) == reinterpret_cast<GstObject*>(m_pipeline.get())) {
         GstState currentState, newState;
@@ -391,7 +343,7 @@ void AppendPipeline::handleStateChangeMessage(GstMessage* message)
 
 gint AppendPipeline::id()
 {
-    ASSERT(WTF::isMainThread());
+    ASSERT(isMainThread());
 
     if (m_id)
         return m_id;
@@ -424,7 +376,7 @@ gint AppendPipeline::id()
 
 void AppendPipeline::setAppendState(AppendState newAppendState)
 {
-    ASSERT(WTF::isMainThread());
+    ASSERT(isMainThread());
     // Valid transitions:
     // NotStarted-->Ongoing-->DataStarve-->NotStarted
     //           |         |            `->Aborting-->NotStarted
@@ -564,7 +516,7 @@ void AppendPipeline::setAppendState(AppendState newAppendState)
 
 void AppendPipeline::parseDemuxerSrcPadCaps(GstCaps* demuxerSrcPadCaps)
 {
-    ASSERT(WTF::isMainThread());
+    ASSERT(isMainThread());
 
     m_demuxerSrcPadCaps = adoptGRef(demuxerSrcPadCaps);
     m_streamType = WebCore::MediaSourceStreamTypeGStreamer::Unknown;
@@ -592,7 +544,7 @@ void AppendPipeline::parseDemuxerSrcPadCaps(GstCaps* demuxerSrcPadCaps)
 
 void AppendPipeline::appsinkCapsChanged()
 {
-    ASSERT(WTF::isMainThread());
+    ASSERT(isMainThread());
 
     if (!m_appsink)
         return;
@@ -616,7 +568,7 @@ void AppendPipeline::appsinkCapsChanged()
 
 void AppendPipeline::handleEndOfAppend()
 {
-    ASSERT(WTF::isMainThread());
+    ASSERT(isMainThread());
     GST_TRACE_OBJECT(m_pipeline.get(), "received end-of-append");
 
     // Regardless of the state transition, the result is the same: didReceiveAllPendingSamples() is called.
@@ -637,7 +589,7 @@ void AppendPipeline::handleEndOfAppend()
 
 void AppendPipeline::appsinkNewSample(GRefPtr<GstSample>&& sample)
 {
-    ASSERT(WTF::isMainThread());
+    ASSERT(isMainThread());
 
     if (UNLIKELY(!gst_sample_get_buffer(sample.get()))) {
         GST_WARNING("Received sample without buffer from appsink.");
@@ -673,7 +625,7 @@ void AppendPipeline::appsinkNewSample(GRefPtr<GstSample>&& sample)
 
 void AppendPipeline::appsinkEOS()
 {
-    ASSERT(WTF::isMainThread());
+    ASSERT(isMainThread());
 
     switch (m_appendState) {
     case AppendState::Aborting:
@@ -694,7 +646,7 @@ void AppendPipeline::appsinkEOS()
 
 void AppendPipeline::didReceiveInitializationSegment()
 {
-    ASSERT(WTF::isMainThread());
+    ASSERT(isMainThread());
 
     WebCore::SourceBufferPrivateClient::InitializationSegment initializationSegment;
 
@@ -726,7 +678,7 @@ void AppendPipeline::didReceiveInitializationSegment()
 
 AtomicString AppendPipeline::trackId()
 {
-    ASSERT(WTF::isMainThread());
+    ASSERT(isMainThread());
 
     if (!m_track)
         return AtomicString();
@@ -736,7 +688,7 @@ AtomicString AppendPipeline::trackId()
 
 void AppendPipeline::consumeAppsinkAvailableSamples()
 {
-    ASSERT(WTF::isMainThread());
+    ASSERT(isMainThread());
 
     GRefPtr<GstSample> sample;
     int batchedSampleCount = 0;
@@ -750,7 +702,7 @@ void AppendPipeline::consumeAppsinkAvailableSamples()
 
 void AppendPipeline::resetPipeline()
 {
-    ASSERT(WTF::isMainThread());
+    ASSERT(isMainThread());
     GST_DEBUG("resetting pipeline");
 
     gst_element_set_state(m_pipeline.get(), GST_STATE_READY);
@@ -769,7 +721,7 @@ void AppendPipeline::resetPipeline()
 
 void AppendPipeline::abort()
 {
-    ASSERT(WTF::isMainThread());
+    ASSERT(isMainThread());
     GST_DEBUG("aborting");
 
     m_pendingBuffer = nullptr;
@@ -818,9 +770,9 @@ GstFlowReturn AppendPipeline::pushNewBuffer(GstBuffer* buffer)
     return GST_FLOW_OK;
 }
 
-GstFlowReturn AppendPipeline::handleNewAppsinkSample(GstElement* appsink)
+GstFlowReturn AppendPipeline::handleAppsinkNewSampleFromStreamingThread(GstElement*)
 {
-    ASSERT(!WTF::isMainThread());
+    ASSERT(!isMainThread());
     if (&WTF::Thread::current() != m_streamingThread) {
         // m_streamingThreadId has been initialized in appsrcEndOfAppendCheckerProbe().
         // For a buffer to reach the appsink, a buffer must have passed through appsrcEndOfAppendCheckerProbe() first.
@@ -838,10 +790,11 @@ GstFlowReturn AppendPipeline::handleNewAppsinkSample(GstElement* appsink)
     }
 
     if (!m_wasBusAlreadyNotifiedOfAvailableSamples.test_and_set()) {
-        GstStructure* structure = gst_structure_new_empty("appsink-new-sample");
-        GstMessage* message = gst_message_new_application(GST_OBJECT(appsink), structure);
-        gst_bus_post(m_bus.get(), message);
-        GST_TRACE("appsink-new-sample message posted to bus");
+        GST_TRACE("Posting appsink-new-sample task to the main thread");
+        m_taskQueue.enqueueTask([this]() {
+            m_wasBusAlreadyNotifiedOfAvailableSamples.clear();
+            consumeAppsinkAvailableSamples();
+        });
     }
 
     return GST_FLOW_OK;
@@ -879,8 +832,9 @@ createOptionalParserForFormat(GstPad* demuxerSrcPad)
     return nullptr;
 }
 
-void AppendPipeline::connectDemuxerSrcPadToAppsinkFromAnyThread(GstPad* demuxerSrcPad)
+void AppendPipeline::connectDemuxerSrcPadToAppsinkFromStreamingThread(GstPad* demuxerSrcPad)
 {
+    ASSERT(!isMainThread());
     if (!m_appsink)
         return;
 
@@ -905,23 +859,14 @@ void AppendPipeline::connectDemuxerSrcPadToAppsinkFromAnyThread(GstPad* demuxerS
     else
         m_initialDuration = MediaTime::positiveInfiniteTime();
 
-    if (WTF::isMainThread())
+    GST_DEBUG("Requesting demuxer-connect-to-appsink to main thread");
+    auto response = m_taskQueue.enqueueTaskAndWait<AbortableTaskQueue::Void>([this, demuxerSrcPad]() {
         connectDemuxerSrcPadToAppsink(demuxerSrcPad);
-    else {
-        // Call connectDemuxerSrcPadToAppsink() in the main thread and wait.
-        LockHolder locker(m_padAddRemoveLock);
-        if (!m_playerPrivate)
-            return;
-
-        GstStructure* structure = gst_structure_new("demuxer-connect-to-appsink", "demuxer-src-pad", G_TYPE_OBJECT, demuxerSrcPad, nullptr);
-        GstMessage* message = gst_message_new_application(GST_OBJECT(m_demux.get()), structure);
-        gst_bus_post(m_bus.get(), message);
-        GST_TRACE("demuxer-connect-to-appsink message posted to bus");
-
-        m_padAddRemoveCondition.wait(m_padAddRemoveLock);
-
-        if (!m_playerPrivate)
-            return;
+        return AbortableTaskQueue::Void();
+    });
+    if (!response) {
+        // The AppendPipeline has been destroyed or aborted before we received a response.
+        return;
     }
 
     // Must be done in the thread we were called from (usually streaming thread).
@@ -966,14 +911,13 @@ void AppendPipeline::connectDemuxerSrcPadToAppsinkFromAnyThread(GstPad* demuxerS
 
 void AppendPipeline::connectDemuxerSrcPadToAppsink(GstPad* demuxerSrcPad)
 {
-    ASSERT(WTF::isMainThread());
+    ASSERT(isMainThread());
     GST_DEBUG("Connecting to appsink");
 
     const String& type = m_sourceBufferPrivate->type().containerType();
     if (type.endsWith("webm"))
         gst_pad_add_probe(demuxerSrcPad, GST_PAD_PROBE_TYPE_EVENT_DOWNSTREAM, matroskademuxForceSegmentStartToEqualZero, nullptr, nullptr);
 
-    LockHolder locker(m_padAddRemoveLock);
     GRefPtr<GstPad> sinkSinkPad = adoptGRef(gst_element_get_static_pad(m_appsink.get(), "sink"));
 
     // Only one stream per demuxer is supported.
@@ -982,7 +926,6 @@ void AppendPipeline::connectDemuxerSrcPadToAppsink(GstPad* demuxerSrcPad)
     GRefPtr<GstCaps> caps = adoptGRef(gst_pad_get_current_caps(GST_PAD(demuxerSrcPad)));
 
     if (!caps || m_appendState == AppendState::Invalid || !m_playerPrivate) {
-        m_padAddRemoveCondition.notifyOne();
         return;
     }
 
@@ -1018,8 +961,6 @@ void AppendPipeline::connectDemuxerSrcPadToAppsink(GstPad* demuxerSrcPad)
         // This is going to cause an error which will detach the SourceBuffer and tear down this
         // AppendPipeline, so we need the padAddRemove lock released before continuing.
         m_track = nullptr;
-        m_padAddRemoveCondition.notifyOne();
-        locker.unlockEarly();
         didReceiveInitializationSegment();
         return;
     default:
@@ -1030,12 +971,13 @@ void AppendPipeline::connectDemuxerSrcPadToAppsink(GstPad* demuxerSrcPad)
     m_appsinkCaps = WTFMove(caps);
     if (m_playerPrivate)
         m_playerPrivate->trackDetected(this, m_track, true);
-
-    m_padAddRemoveCondition.notifyOne();
 }
 
 void AppendPipeline::disconnectDemuxerSrcPadFromAppsinkFromAnyThread(GstPad*)
 {
+    // Note: This function can be called either from the streaming thread (e.g. if a strange initialization segment with
+    // incompatible tracks is appended and the srcpad disconnected) or -- more usually -- from the main thread, when
+    // a state change is made to bring the demuxer down. (State change operations run in the main thread.)
     GST_DEBUG_BIN_TO_DOT_FILE_WITH_TS(GST_BIN(m_pipeline.get()), GST_DEBUG_GRAPH_SHOW_ALL, "pad-removed-before");
 
     GST_DEBUG("Disconnecting appsink");
@@ -1049,23 +991,6 @@ void AppendPipeline::disconnectDemuxerSrcPadFromAppsinkFromAnyThread(GstPad*)
     GST_DEBUG_BIN_TO_DOT_FILE_WITH_TS(GST_BIN(m_pipeline.get()), GST_DEBUG_GRAPH_SHOW_ALL, "pad-removed-after");
 }
 
-void AppendPipeline::appendPipelineDemuxerNoMorePadsFromAnyThread()
-{
-    GST_TRACE("appendPipelineDemuxerNoMorePadsFromAnyThread");
-    GstStructure* structure = gst_structure_new_empty("demuxer-no-more-pads");
-    GstMessage* message = gst_message_new_application(GST_OBJECT(m_appsrc.get()), structure);
-    gst_bus_post(m_bus.get(), message);
-    GST_TRACE("appendPipelineDemuxerNoMorePadsFromAnyThread - posted to bus");
-}
-
-static void appendPipelineAppsinkCapsChanged(GObject* appsinkPad, GParamSpec*, AppendPipeline* appendPipeline)
-{
-    GstStructure* structure = gst_structure_new_empty("appsink-caps-changed");
-    GstMessage* message = gst_message_new_application(GST_OBJECT(appsinkPad), structure);
-    gst_bus_post(appendPipeline->bus(), message);
-    GST_TRACE("appsink-caps-changed message posted to bus");
-}
-
 #if !LOG_DISABLED
 static GstPadProbeReturn appendPipelinePadProbeDebugInformation(GstPad*, GstPadProbeInfo* info, struct PadProbeInformation* padProbeInformation)
 {
@@ -1105,40 +1030,6 @@ static GstPadProbeReturn appendPipelineDemuxerBlackHolePadProbe(GstPad*, GstPadP
     return GST_PAD_PROBE_DROP;
 }
 
-static void appendPipelineDemuxerPadAdded(GstElement*, GstPad* demuxerSrcPad, AppendPipeline* appendPipeline)
-{
-    appendPipeline->connectDemuxerSrcPadToAppsinkFromAnyThread(demuxerSrcPad);
-}
-
-static void appendPipelineDemuxerPadRemoved(GstElement*, GstPad* demuxerSrcPad, AppendPipeline* appendPipeline)
-{
-    appendPipeline->disconnectDemuxerSrcPadFromAppsinkFromAnyThread(demuxerSrcPad);
-}
-
-static void appendPipelineDemuxerNoMorePads(GstElement*, AppendPipeline* appendPipeline)
-{
-    appendPipeline->appendPipelineDemuxerNoMorePadsFromAnyThread();
-}
-
-static GstFlowReturn appendPipelineAppsinkNewSample(GstElement* appsink, AppendPipeline* appendPipeline)
-{
-    return appendPipeline->handleNewAppsinkSample(appsink);
-}
-
-static void appendPipelineAppsinkEOS(GstElement*, AppendPipeline* appendPipeline)
-{
-    if (WTF::isMainThread())
-        appendPipeline->appsinkEOS();
-    else {
-        GstStructure* structure = gst_structure_new_empty("appsink-eos");
-        GstMessage* message = gst_message_new_application(GST_OBJECT(appendPipeline->appsink()), structure);
-        gst_bus_post(appendPipeline->bus(), message);
-        GST_TRACE("appsink-eos message posted to bus");
-    }
-
-    GST_DEBUG("%s main thread", (WTF::isMainThread()) ? "Is" : "Not");
-}
-
 static GstPadProbeReturn matroskademuxForceSegmentStartToEqualZero(GstPad*, GstPadProbeInfo* info, void*)
 {
     // matroskademux sets GstSegment.start to the PTS of the first frame.
index da4f75e..3ad4d34 100644 (file)
@@ -22,6 +22,7 @@
 
 #if ENABLE(VIDEO) && USE(GSTREAMER) && ENABLE(MEDIA_SOURCE)
 
+#include "AbortableTaskQueue.h"
 #include "GStreamerCommon.h"
 #include "MediaPlayerPrivateGStreamerMSE.h"
 #include "MediaSourceClientGStreamerMSE.h"
@@ -45,21 +46,28 @@ struct PadProbeInformation {
 
 class AppendPipeline : public ThreadSafeRefCounted<AppendPipeline> {
 public:
-    enum class AppendState { Invalid, NotStarted, Ongoing, DataStarve, Sampling, LastSample, Aborting };
-
     AppendPipeline(Ref<MediaSourceClientGStreamerMSE>, Ref<SourceBufferPrivateGStreamer>, MediaPlayerPrivateGStreamerMSE&);
     virtual ~AppendPipeline();
 
+    GstFlowReturn pushNewBuffer(GstBuffer*);
+    void clearPlayerPrivate();
+    void abort();
+    Ref<SourceBufferPrivateGStreamer> sourceBufferPrivate() { return m_sourceBufferPrivate.get(); }
+    GstCaps* appsinkCaps() { return m_appsinkCaps.get(); }
+    RefPtr<WebCore::TrackPrivateBase> track() { return m_track; }
+    MediaPlayerPrivateGStreamerMSE* playerPrivate() { return m_playerPrivate; }
+
+private:
+    enum class AppendState { Invalid, NotStarted, Ongoing, DataStarve, Sampling, LastSample, Aborting };
+
     void handleNeedContextSyncMessage(GstMessage*);
-    void handleApplicationMessage(GstMessage*);
     void handleStateChangeMessage(GstMessage*);
 
     gint id();
     AppendState appendState() { return m_appendState; }
     void setAppendState(AppendState);
 
-    GstFlowReturn handleNewAppsinkSample(GstElement*);
-    GstFlowReturn pushNewBuffer(GstBuffer*);
+    GstFlowReturn handleAppsinkNewSampleFromStreamingThread(GstElement*);
 
     // Takes ownership of caps.
     void parseDemuxerSrcPadCaps(GstCaps*);
@@ -69,26 +77,18 @@ public:
     void handleEndOfAppend();
     void didReceiveInitializationSegment();
     AtomicString trackId();
-    void abort();
 
-    void clearPlayerPrivate();
-    Ref<SourceBufferPrivateGStreamer> sourceBufferPrivate() { return m_sourceBufferPrivate.get(); }
     GstBus* bus() { return m_bus.get(); }
     GstElement* pipeline() { return m_pipeline.get(); }
     GstElement* appsrc() { return m_appsrc.get(); }
     GstElement* appsink() { return m_appsink.get(); }
     GstCaps* demuxerSrcPadCaps() { return m_demuxerSrcPadCaps.get(); }
-    GstCaps* appsinkCaps() { return m_appsinkCaps.get(); }
-    MediaPlayerPrivateGStreamerMSE* playerPrivate() { return m_playerPrivate; }
-    RefPtr<WebCore::TrackPrivateBase> track() { return m_track; }
     WebCore::MediaSourceStreamTypeGStreamer streamType() { return m_streamType; }
 
     void disconnectDemuxerSrcPadFromAppsinkFromAnyThread(GstPad*);
-    void appendPipelineDemuxerNoMorePadsFromAnyThread();
-    void connectDemuxerSrcPadToAppsinkFromAnyThread(GstPad*);
+    void connectDemuxerSrcPadToAppsinkFromStreamingThread(GstPad*);
     void connectDemuxerSrcPadToAppsink(GstPad*);
 
-private:
     void resetPipeline();
     void checkEndOfAppend();
     void demuxerNoMorePads();
@@ -98,6 +98,7 @@ private:
     GstPadProbeReturn appsrcEndOfAppendCheckerProbe(GstPadProbeInfo*);
 
     static void staticInitialization();
+    static const char* dumpAppendState(AppendPipeline::AppendState);
 
     static std::once_flag s_staticInitializationFlag;
     static GType s_endOfAppendMetaType;
@@ -131,9 +132,6 @@ private:
     // queue, instead of it growing unbounded.
     std::atomic_flag m_wasBusAlreadyNotifiedOfAvailableSamples;
 
-    Lock m_padAddRemoveLock;
-    Condition m_padAddRemoveCondition;
-
     GRefPtr<GstCaps> m_appsinkCaps;
     GRefPtr<GstCaps> m_demuxerSrcPadCaps;
     FloatSize m_presentationSize;
@@ -158,6 +156,8 @@ private:
     WebCore::MediaSourceStreamTypeGStreamer m_streamType;
     RefPtr<WebCore::TrackPrivateBase> m_track;
 
+    AbortableTaskQueue m_taskQueue;
+
     GRefPtr<GstBuffer> m_pendingBuffer;
 };
 
index b80654c..17e5a59 100644 (file)
@@ -1,3 +1,23 @@
+2018-11-12  Alicia Boya García  <aboya@igalia.com>
+
+        [MSE][GStreamer] Introduce AbortableTaskQueue
+        https://bugs.webkit.org/show_bug.cgi?id=190902
+
+        Reviewed by Xabier Rodriguez-Calvar.
+
+        Tests for AbortableTaskQueue are included.
+
+        * TestWebKitAPI/TestWebKitAPI.xcodeproj/project.pbxproj:
+        * TestWebKitAPI/PlatformGTK.cmake:
+        * TestWebKitAPI/Tests/WebCore/AbortableTaskQueue.cpp: Added.
+        (TestWebKitAPI::TEST):
+        (TestWebKitAPI::FancyResponse::FancyResponse):
+        (TestWebKitAPI::FancyResponse::operator=):
+        (TestWebKitAPI::DeterministicScheduler::DeterministicScheduler):
+        (TestWebKitAPI::DeterministicScheduler::ThreadContext::ThreadContext):
+        (TestWebKitAPI::DeterministicScheduler::ThreadContext::waitMyTurn):
+        (TestWebKitAPI::DeterministicScheduler::ThreadContext::yieldToThread):
+
 2018-11-11  Fujii Hironori  <Hironori.Fujii@sony.com>
 
         run-bindings-tests is timing out in some WinCairo bots
index b0e268e..64ca400 100644 (file)
@@ -104,6 +104,7 @@ add_executable(TestWebCore
     ${TESTWEBKITAPI_DIR}/Tests/WebCore/URLParser.cpp
     ${TESTWEBKITAPI_DIR}/Tests/WebCore/UserAgentQuirks.cpp
     ${TESTWEBKITAPI_DIR}/Tests/WebCore/SampleMap.cpp
+    ${TESTWEBKITAPI_DIR}/Tests/WebCore/AbortableTaskQueue.cpp
 )
 
 target_link_libraries(TestWebCore ${test_webcore_LIBRARIES})
index 14366cc..84d90b8 100644 (file)
                7CCE7EDB1A411A9200447C4C /* CSSParser.cpp in Sources */ = {isa = PBXBuildFile; fileRef = CD5451E919E41F9D0016936F /* CSSParser.cpp */; };
                7CCE7EDC1A411A9200447C4C /* CalculationValue.cpp in Sources */ = {isa = PBXBuildFile; fileRef = 93A720E518F1A0E800A848E1 /* CalculationValue.cpp */; };
                7CCE7EDD1A411A9200447C4C /* TimeRanges.cpp in Sources */ = {isa = PBXBuildFile; fileRef = CDC2C7141797089D00E627FB /* TimeRanges.cpp */; };
+               CD51C2A93E78B5C1FF337C08 /* AbortableTaskQueue.cpp in Sources */ = {isa = PBXBuildFile; fileRef = ABF510632A19B8AC7EC40E17 /* AbortableTaskQueue.cpp */; };
                7CCE7EDE1A411A9200447C4C /* URL.cpp in Sources */ = {isa = PBXBuildFile; fileRef = 440A1D3814A0103A008A66F2 /* URL.cpp */; };
                7CCE7EDF1A411A9200447C4C /* LayoutUnit.cpp in Sources */ = {isa = PBXBuildFile; fileRef = 14464012167A8305000BD218 /* LayoutUnit.cpp */; };
                7CCE7EE01A411A9A00447C4C /* EditorCommands.mm in Sources */ = {isa = PBXBuildFile; fileRef = BCAA485714A044D40088FAC4 /* EditorCommands.mm */; };
                CDBFCC421A9FF44800A7B691 /* FullscreenZoomInitialFrame.html */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = text.html; path = FullscreenZoomInitialFrame.html; sourceTree = "<group>"; };
                CDBFCC431A9FF44800A7B691 /* FullscreenZoomInitialFrame.mm */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.cpp.objcpp; path = FullscreenZoomInitialFrame.mm; sourceTree = "<group>"; };
                CDC2C7141797089D00E627FB /* TimeRanges.cpp */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.cpp.cpp; path = TimeRanges.cpp; sourceTree = "<group>"; };
+               ABF510632A19B8AC7EC40E17 /* AbortableTaskQueue.cpp */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.cpp.cpp; path = AbortableTaskQueue.cpp; sourceTree = "<group>"; };
                CDC8E4851BC5B19400594FEC /* AudioSessionCategoryIOS.mm */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.cpp.objcpp; path = AudioSessionCategoryIOS.mm; sourceTree = "<group>"; };
                CDC8E4891BC5C96200594FEC /* video-with-audio.html */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = text.html; path = "video-with-audio.html"; sourceTree = "<group>"; };
                CDC8E48A1BC5C96200594FEC /* video-with-audio.mp4 */ = {isa = PBXFileReference; lastKnownFileType = file; path = "video-with-audio.mp4"; sourceTree = "<group>"; };
                                CD89D0371C4EDB1300040A04 /* cocoa */,
                                A1EC11851F4253D900D0146E /* ios */,
                                3162AE9A1E6F2F8F000E4DBC /* mac */,
+                               ABF510632A19B8AC7EC40E17 /* AbortableTaskQueue.cpp */,
                                7A909A6F1D877475007E10F8 /* AffineTransform.cpp */,
                                6354F4D01F7C3AB500D89DF3 /* ApplicationManifestParser.cpp */,
                                93A720E518F1A0E800A848E1 /* CalculationValue.cpp */,
diff --git a/Tools/TestWebKitAPI/Tests/WebCore/AbortableTaskQueue.cpp b/Tools/TestWebKitAPI/Tests/WebCore/AbortableTaskQueue.cpp
new file mode 100644 (file)
index 0000000..5df8f5a
--- /dev/null
@@ -0,0 +1,257 @@
+/*
+ * Copyright (C) 2018 Igalia, S.L.
+ * Copyright (C) 2018 Metrological Group B.V.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions
+ * are met:
+ * 1. Redistributions of source code must retain the above copyright
+ *    notice, this list of conditions and the following disclaimer.
+ * 2. Redistributions in binary form must reproduce the above copyright
+ *    notice, this list of conditions and the following disclaimer in the
+ *    documentation and/or other materials provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY APPLE INC. AND ITS CONTRIBUTORS ``AS IS''
+ * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
+ * THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
+ * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL APPLE INC. OR ITS CONTRIBUTORS
+ * BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+ * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+ * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+ * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF
+ * THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+#include "config.h"
+
+#include <AbortableTaskQueue.h>
+#include <Utilities.h>
+#include <wtf/Threading.h>
+
+using namespace WebCore;
+
+namespace TestWebKitAPI {
+
+TEST(AbortableTaskQueue, AsyncTasks)
+{
+    AbortableTaskQueue taskQueue;
+    bool testFinished { false };
+    int currentStep { 0 };
+    RunLoop::initializeMainRunLoop();
+
+    auto backgroundThreadFunction = [&]() {
+        EXPECT_FALSE(isMainThread());
+        taskQueue.enqueueTask([&]() {
+            EXPECT_TRUE(isMainThread());
+            currentStep++;
+            EXPECT_EQ(1, currentStep);
+        });
+        taskQueue.enqueueTask([&]() {
+            EXPECT_TRUE(isMainThread());
+            currentStep++;
+            EXPECT_EQ(2, currentStep);
+            testFinished = true;
+        });
+    };
+    RunLoop::current().dispatch([backgroundThreadFunction = WTFMove(backgroundThreadFunction)]() mutable {
+        WTF::Thread::create("atq-background", WTFMove(backgroundThreadFunction))->detach();
+    });
+
+    Util::run(&testFinished);
+}
+
+struct FancyResponse {
+    WTF_MAKE_NONCOPYABLE(FancyResponse);
+public:
+    FancyResponse(int fancyInt)
+        : fancyInt(fancyInt)
+    { }
+
+    FancyResponse(FancyResponse&& a)
+        : fancyInt(a.fancyInt)
+    { }
+
+    FancyResponse& operator=(FancyResponse&& a)
+    {
+        this->fancyInt = a.fancyInt;
+        return *this;
+    }
+
+    int fancyInt;
+};
+
+TEST(AbortableTaskQueue, SyncTasks)
+{
+    AbortableTaskQueue taskQueue;
+    bool testFinished { false };
+    int currentStep { 0 };
+    RunLoop::initializeMainRunLoop();
+
+    auto backgroundThreadFunction = [&]() {
+        EXPECT_FALSE(isMainThread());
+        std::optional<FancyResponse> response = taskQueue.enqueueTaskAndWait<FancyResponse>([&]() -> FancyResponse {
+            EXPECT_TRUE(isMainThread());
+            currentStep++;
+            EXPECT_EQ(1, currentStep);
+            FancyResponse returnValue(100);
+            return returnValue;
+        });
+        currentStep++;
+        EXPECT_EQ(2, currentStep);
+        EXPECT_TRUE(response);
+        EXPECT_EQ(100, response->fancyInt);
+        RunLoop::main().dispatch([&]() {
+            testFinished = true;
+        });
+    };
+    RunLoop::current().dispatch([backgroundThreadFunction = WTFMove(backgroundThreadFunction)]() mutable {
+        WTF::Thread::create("atq-background", WTFMove(backgroundThreadFunction))->detach();
+    });
+
+    Util::run(&testFinished);
+}
+
+template <typename ThreadEnum>
+class DeterministicScheduler {
+    WTF_MAKE_NONCOPYABLE(DeterministicScheduler);
+public:
+    DeterministicScheduler(ThreadEnum firstThread)
+        : m_currentThread(firstThread)
+    { }
+
+    class ThreadContext {
+        WTF_MAKE_NONCOPYABLE(ThreadContext);
+    public:
+        ThreadContext(DeterministicScheduler& scheduler, ThreadEnum thisThread)
+            : m_scheduler(scheduler), m_thisThread(thisThread)
+        { }
+
+        void waitMyTurn()
+        {
+            LockHolder lock(m_scheduler.m_mutex);
+            m_scheduler.m_currentThreadChanged.wait(m_scheduler.m_mutex, [this]() {
+                return m_scheduler.m_currentThread == m_thisThread;
+            });
+        }
+
+        void yieldToThread(ThreadEnum nextThread)
+        {
+            LockHolder lock(m_scheduler.m_mutex);
+            m_scheduler.m_currentThread = nextThread;
+            m_scheduler.m_currentThreadChanged.notifyAll();
+        }
+
+    private:
+        DeterministicScheduler& m_scheduler;
+        ThreadEnum m_thisThread;
+    };
+
+
+private:
+    ThreadEnum m_currentThread;
+    Condition m_currentThreadChanged;
+    Lock m_mutex;
+};
+
+enum class TestThread {
+    Main,
+    Background
+};
+
+TEST(AbortableTaskQueue, Abort)
+{
+    DeterministicScheduler<TestThread> scheduler(TestThread::Background);
+
+    AbortableTaskQueue taskQueue;
+    bool testFinished { false };
+    RunLoop::initializeMainRunLoop();
+
+    auto backgroundThreadFunction = [&]() {
+        EXPECT_FALSE(isMainThread());
+        DeterministicScheduler<TestThread>::ThreadContext backgroundThreadContext(scheduler, TestThread::Background);
+
+        taskQueue.enqueueTask([]() {
+            // This task should not have been able to run under the scheduling of this test.
+            EXPECT_TRUE(false);
+        });
+        backgroundThreadContext.yieldToThread(TestThread::Main);
+        backgroundThreadContext.waitMyTurn();
+
+        // Main thread has called startAborting().
+
+        taskQueue.enqueueTask([]() {
+            // This task should not have been able to run under the scheduling of this test.
+            EXPECT_TRUE(false);
+        });
+        // This call must return immediately because we are aborting.
+        std::optional<FancyResponse> response = taskQueue.enqueueTaskAndWait<FancyResponse>([]() -> FancyResponse {
+            // This task should not have been able to run under the scheduling of this test.
+            EXPECT_TRUE(false);
+            return FancyResponse(100);
+        });
+        EXPECT_FALSE(response);
+        backgroundThreadContext.yieldToThread(TestThread::Main);
+        backgroundThreadContext.waitMyTurn();
+
+        // Main thread has called finishAborting().
+
+        taskQueue.enqueueTask([&]() {
+            testFinished = true;
+        });
+    };
+    RunLoop::current().dispatch([&, backgroundThreadFunction = WTFMove(backgroundThreadFunction)]() mutable {
+        EXPECT_TRUE(isMainThread());
+        DeterministicScheduler<TestThread>::ThreadContext mainThreadContext(scheduler, TestThread::Main);
+        WTF::Thread::create("atq-background", WTFMove(backgroundThreadFunction))->detach();
+
+        mainThreadContext.waitMyTurn();
+
+        taskQueue.startAborting();
+        mainThreadContext.yieldToThread(TestThread::Background);
+        mainThreadContext.waitMyTurn();
+
+        taskQueue.finishAborting();
+        mainThreadContext.yieldToThread(TestThread::Background);
+    });
+
+    Util::run(&testFinished);
+}
+
+TEST(AbortableTaskQueue, AbortDuringSyncTask)
+{
+    AbortableTaskQueue taskQueue;
+    bool testFinished { false };
+    RunLoop::initializeMainRunLoop();
+
+    auto backgroundThreadFunction = [&]() {
+        EXPECT_FALSE(isMainThread());
+
+        std::optional<FancyResponse> response = taskQueue.enqueueTaskAndWait<FancyResponse>([]() -> FancyResponse {
+            // This task should not have been able to run under the scheduling of this test.
+            EXPECT_TRUE(false);
+            return FancyResponse(100);
+        });
+
+        // Main thread has called startAborting().
+        EXPECT_FALSE(response);
+
+        RunLoop::main().dispatch([&]() {
+            testFinished = true;
+        });
+    };
+    RunLoop::current().dispatch([&, backgroundThreadFunction = WTFMove(backgroundThreadFunction)]() mutable {
+        EXPECT_TRUE(isMainThread());
+        WTF::Thread::create("atq-background", WTFMove(backgroundThreadFunction))->detach();
+
+        // Give the background thread a bit of time to get blocked waiting for a response.
+        WTF::sleep(100_ms);
+
+        taskQueue.startAborting();
+    });
+
+    Util::run(&testFinished);
+}
+
+} // namespace TestWebKitAPI