[MSE][GStreamer] Introduce AbortableTaskQueue
[WebKit-https.git] / Source / WebCore / platform / AbortableTaskQueue.h
1 /*
2  * Copyright (C) 2018 Igalia, S.L.
3  * Copyright (C) 2018 Metrological Group B.V.
4  *
5  * This library is free software; you can redistribute it and/or
6  * modify it under the terms of the GNU Library General Public
7  * License as published by the Free Software Foundation; either
8  * version 2 of the License, or (at your option) any later version.
9  *
10  * This library is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * Library General Public License for more details.
14  *
15  * You should have received a copy of the GNU Library General Public License
16  * aint with this library; see the file COPYING.LIB.  If not, write to
17  * the Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
18  * Boston, MA 02110-1301, USA.
19  */
20
21 #pragma once
22
23 #include <wtf/Condition.h>
24 #include <wtf/Deque.h>
25 #include <wtf/Function.h>
26 #include <wtf/Lock.h>
27 #include <wtf/RunLoop.h>
28 #include <wtf/StdLibExtras.h>
29
30 namespace WebCore {
31
32 /* AbortableTaskQueue is a high-level synchronization object for cases where abortable work is done in
33  * background thread(s) that sometimes needs to post tasks to the main thread.
34  *
35  * The tasks posted by the background thread(s) to the main thread may be asynchronous, using enqueueTask(),
36  * which returns immediately; or synchronous, using enqueueTaskAndWait(), which blocks the calling
37  * background thread until the task is run by the main thread (possibly returning a value).
38  *
39  * What makes AbortableTaskQueue different from other task queueing mechanisms is that it provides a two-phase
40  * protocol for aborting the work in the background thread in presence of queued tasks without deadlocks or
41  * late notification bugs.
42  *
43  * Without a two-phase design deadlocks would occur when attempting an abort if a background thread was
44  * blocked in a synchronous task and needed to return from there for the abort to be handled. Also, without
45  * a design like this, tasks already enqueued at that point or soon thereafter until the abort is complete
46  * would still be handled by the main thread, even though we don't want to anymore.
47  *
48  * Aborting background processing with AbortableTaskQueue is a several step process:
49  *
50  *  1. Call abortableTaskQueue.startAborting() -- This will make any current or future (until further notice)
51  *     synchronous tasks fail immediately, so that we don't deadlock in the next step. Also, tasks of any kind
52  *     already enqueued will not be run.
53  *
54  *  2. Send the abort signal to the background threads. This is completely application specific. For instance,
55  *     in the AppendPipeline case you would flush or reset the GStreamer pipeline here. Wait until all the
56  *     background threads have finished aborting.
57  *
58  *  3. Call abortableTaskQueue.finishAborting() -- This will allow new tasks queued from this point on to be
59  *     handled just as before the abort was made.
60  *
61  *  4. After this, the background thread(s) can be put to work again safely.
62  *
63  * This class is used for handling demuxer events in AppendPipeline, taking into account demuxing can be
64  * aborted at any moment if SourceBuffer.abort() is called or the SourceBuffer is destroyed. */
65 class AbortableTaskQueue final {
66     WTF_MAKE_NONCOPYABLE(AbortableTaskQueue);
67 public:
68     AbortableTaskQueue()
69     {
70         ASSERT(isMainThread());
71     }
72
73     ~AbortableTaskQueue()
74     {
75         ASSERT(isMainThread());
76         ASSERT(!m_mutex.isHeld());
77         ASSERT(m_channel.isEmpty());
78     }
79
80     // ===========================
81     // Methods for the main thread
82     // ===========================
83
84     // Starts an abort process.
85     //
86     // Tasks already queued will be discarded.
87     //
88     // Until finishAborting is called, all present and future calls to enqueueTaskAndWait() will immediately
89     // return an empty optional.
90     //
91     // This method is idempotent.
92     void startAborting()
93     {
94         ASSERT(isMainThread());
95
96         {
97             LockHolder lockHolder(m_mutex);
98             m_aborting = true;
99             cancelAllTasks();
100         }
101         m_abortedOrResponseSet.notifyAll();
102     }
103
104     // Declares the previous abort finished.
105     //
106     // In order to avoid race conditions the background threads must be unable to post tasks at this point.
107     void finishAborting()
108     {
109         ASSERT(isMainThread());
110
111         LockHolder lockHolder(m_mutex);
112         ASSERT(m_aborting);
113         m_aborting = false;
114     }
115
116     // ==================================
117     // Methods for the background threads
118     // ==================================
119
120     // Enqueue a task to be run on the main thread. The task may be cancelled if an abort starts before it's
121     // handled.
122     void enqueueTask(WTF::Function<void()>&& mainThreadTaskHandler)
123     {
124         ASSERT(!isMainThread());
125
126         LockHolder lockHolder(m_mutex);
127         if (m_aborting)
128             return;
129
130         postTask(WTFMove(mainThreadTaskHandler));
131     }
132
133     // Enqueue a task to be run on the main thread and wait for it to return. The return value of the task is
134     // forwarded to the background thread, wrapped in an optional.
135     //
136     // If we are aborting, the call finishes immediately, returning an empty optional.
137     template<typename R>
138     std::optional<R> enqueueTaskAndWait(WTF::Function<R()>&& mainThreadTaskHandler)
139     {
140         // Don't deadlock the main thread with itself.
141         ASSERT(!isMainThread());
142
143         LockHolder lockHolder(m_mutex);
144         if (m_aborting)
145             return std::nullopt;
146
147         std::optional<R> response = std::nullopt;
148         postTask([this, &response, &mainThreadTaskHandler]() {
149             R responseValue = mainThreadTaskHandler();
150             LockHolder lockHolder(m_mutex);
151             response = WTFMove(responseValue);
152             m_abortedOrResponseSet.notifyAll();
153         });
154         m_abortedOrResponseSet.wait(m_mutex, [this, &response]() {
155             return m_aborting || response;
156         });
157         return response;
158     }
159
160     // This is class is provided for convenience when you want to use enqueueTaskAndWait() but
161     // you don't need any particular data from the main thread in return and just knowing that it finished
162     // running the handler function is enough.
163     class Void { };
164
165 private:
166     // Protected state:
167     //   Main thread: read-write. Writes must be made with the lock.
168     //   Background threads: read only. Reads must be made with the lock.
169     class Task : public ThreadSafeRefCounted<Task> {
170         WTF_MAKE_NONCOPYABLE(Task);
171         WTF_MAKE_FAST_ALLOCATED(Task);
172     public:
173         static Ref<Task> create(AbortableTaskQueue* taskQueue, WTF::Function<void()>&& taskCallback)
174         {
175             return adoptRef(*new Task(taskQueue, WTFMove(taskCallback)));
176         }
177
178         bool isCancelled() const
179         {
180             return !m_taskQueue;
181         }
182
183         void cancel()
184         {
185             ASSERT(!isCancelled());
186             m_taskCallback = nullptr;
187             m_taskQueue = nullptr;
188         }
189
190         void dispatch()
191         {
192             ASSERT(isMainThread());
193             if (isCancelled())
194                 return;
195
196             LockHolder lock(m_taskQueue->m_mutex);
197             ASSERT(this == m_taskQueue->m_channel.first().ptr());
198             m_taskQueue->m_channel.removeFirst();
199             lock.unlockEarly();
200             m_taskCallback();
201         }
202
203     private:
204         AbortableTaskQueue* m_taskQueue;
205         WTF::Function<void()> m_taskCallback;
206
207         Task(AbortableTaskQueue* taskQueue, WTF::Function<void()>&& taskCallback)
208             : m_taskQueue(taskQueue), m_taskCallback(WTFMove(taskCallback))
209         { }
210     };
211
212     void postTask(WTF::Function<void()>&& callback)
213     {
214         ASSERT(m_mutex.isHeld());
215         Ref<Task> task = Task::create(this, WTFMove(callback));
216         m_channel.append(task.copyRef());
217         RunLoop::main().dispatch([task = WTFMove(task)]() { task->dispatch(); });
218     }
219
220     void cancelAllTasks()
221     {
222         ASSERT(isMainThread());
223         ASSERT(m_mutex.isHeld());
224         for (Ref<Task>& task : m_channel)
225             task->cancel();
226         m_channel.clear();
227     }
228
229     bool m_aborting { false };
230     Lock m_mutex;
231     Condition m_abortedOrResponseSet;
232     WTF::Deque<Ref<Task>> m_channel;
233 };
234
235 } // namespace WebCore