Rename AtomicString to AtomString
[WebKit-https.git] / Source / WebCore / platform / graphics / gstreamer / mse / AppendPipeline.cpp
1 /*
2  * Copyright (C) 2016, 2017 Metrological Group B.V.
3  * Copyright (C) 2016, 2017 Igalia S.L
4  *
5  * This library is free software; you can redistribute it and/or
6  * modify it under the terms of the GNU Library General Public
7  * License as published by the Free Software Foundation; either
8  * version 2 of the License, or (at your option) any later version.
9  *
10  * This library is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * Library General Public License for more details.
14  *
15  * You should have received a copy of the GNU Library General Public License
16  * aint with this library; see the file COPYING.LIB.  If not, write to
17  * the Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
18  * Boston, MA 02110-1301, USA.
19  */
20
21 #include "config.h"
22 #include "AppendPipeline.h"
23
24 #if ENABLE(VIDEO) && USE(GSTREAMER) && ENABLE(MEDIA_SOURCE)
25
26 #include "AudioTrackPrivateGStreamer.h"
27 #include "GStreamerCommon.h"
28 #include "GStreamerEMEUtilities.h"
29 #include "GStreamerMediaDescription.h"
30 #include "GStreamerRegistryScannerMSE.h"
31 #include "MediaSampleGStreamer.h"
32 #include "InbandTextTrackPrivateGStreamer.h"
33 #include "MediaDescription.h"
34 #include "SourceBufferPrivateGStreamer.h"
35 #include "VideoTrackPrivateGStreamer.h"
36 #include <functional>
37 #include <gst/app/gstappsink.h>
38 #include <gst/app/gstappsrc.h>
39 #include <gst/gst.h>
40 #include <gst/pbutils/pbutils.h>
41 #include <gst/video/video.h>
42 #include <wtf/Condition.h>
43 #include <wtf/glib/GLibUtilities.h>
44 #include <wtf/glib/RunLoopSourcePriority.h>
45 #include <wtf/text/StringConcatenateNumbers.h>
46
47 GST_DEBUG_CATEGORY_EXTERN(webkit_mse_debug);
48 #define GST_CAT_DEFAULT webkit_mse_debug
49
50 namespace WebCore {
51
52 GType AppendPipeline::s_endOfAppendMetaType = 0;
53 const GstMetaInfo* AppendPipeline::s_webKitEndOfAppendMetaInfo = nullptr;
54 std::once_flag AppendPipeline::s_staticInitializationFlag;
55
56 struct EndOfAppendMeta {
57     GstMeta base;
58     static gboolean init(GstMeta*, void*, GstBuffer*) { return TRUE; }
59     static gboolean transform(GstBuffer*, GstMeta*, GstBuffer*, GQuark, void*) { g_return_val_if_reached(FALSE); }
60     static void free(GstMeta*, GstBuffer*) { }
61 };
62
63 void AppendPipeline::staticInitialization()
64 {
65     ASSERT(isMainThread());
66
67     const char* tags[] = { nullptr };
68     s_endOfAppendMetaType = gst_meta_api_type_register("WebKitEndOfAppendMetaAPI", tags);
69     s_webKitEndOfAppendMetaInfo = gst_meta_register(s_endOfAppendMetaType, "WebKitEndOfAppendMeta", sizeof(EndOfAppendMeta), EndOfAppendMeta::init, EndOfAppendMeta::free, EndOfAppendMeta::transform);
70 }
71
72 #if !LOG_DISABLED
73 static GstPadProbeReturn appendPipelinePadProbeDebugInformation(GstPad*, GstPadProbeInfo*, struct PadProbeInformation*);
74 #endif
75
76 #if ENABLE(ENCRYPTED_MEDIA)
77 static GstPadProbeReturn appendPipelineAppsinkPadEventProbe(GstPad*, GstPadProbeInfo*, struct PadProbeInformation*);
78 #endif
79
80 static GstPadProbeReturn appendPipelineDemuxerBlackHolePadProbe(GstPad*, GstPadProbeInfo*, gpointer);
81
82 static GstPadProbeReturn matroskademuxForceSegmentStartToEqualZero(GstPad*, GstPadProbeInfo*, void*);
83
84 // Wrapper for gst_element_set_state() that emits a critical if the state change fails or is not synchronous.
85 static void assertedElementSetState(GstElement* element, GstState desiredState)
86 {
87     GstState oldState;
88     gst_element_get_state(element, &oldState, nullptr, 0);
89
90     GstStateChangeReturn result = gst_element_set_state(element, desiredState);
91
92     GstState newState;
93     gst_element_get_state(element, &newState, nullptr, 0);
94
95     if (desiredState != newState || result != GST_STATE_CHANGE_SUCCESS) {
96         GST_ERROR("AppendPipeline state change failed (returned %d): %" GST_PTR_FORMAT " %d -> %d (expected %d)",
97             static_cast<int>(result), element, static_cast<int>(oldState), static_cast<int>(newState), static_cast<int>(desiredState));
98         ASSERT_NOT_REACHED();
99     }
100 }
101
102 AppendPipeline::AppendPipeline(Ref<MediaSourceClientGStreamerMSE> mediaSourceClient, Ref<SourceBufferPrivateGStreamer> sourceBufferPrivate, MediaPlayerPrivateGStreamerMSE& playerPrivate)
103     : m_mediaSourceClient(mediaSourceClient.get())
104     , m_sourceBufferPrivate(sourceBufferPrivate.get())
105     , m_playerPrivate(&playerPrivate)
106     , m_id(0)
107     , m_wasBusAlreadyNotifiedOfAvailableSamples(false)
108     , m_streamType(Unknown)
109 {
110     ASSERT(isMainThread());
111     std::call_once(s_staticInitializationFlag, AppendPipeline::staticInitialization);
112
113     GST_TRACE("Creating AppendPipeline (%p)", this);
114
115     // FIXME: give a name to the pipeline, maybe related with the track it's managing.
116     // The track name is still unknown at this time, though.
117     static size_t appendPipelineCount = 0;
118     String pipelineName = makeString("append-pipeline-",
119         m_sourceBufferPrivate->type().containerType().replace("/", "-"), '-', appendPipelineCount++);
120     m_pipeline = gst_pipeline_new(pipelineName.utf8().data());
121
122     m_bus = adoptGRef(gst_pipeline_get_bus(GST_PIPELINE(m_pipeline.get())));
123     gst_bus_add_signal_watch_full(m_bus.get(), RunLoopSourcePriority::RunLoopDispatcher);
124     gst_bus_enable_sync_message_emission(m_bus.get());
125
126     g_signal_connect(m_bus.get(), "sync-message::error", G_CALLBACK(+[](GstBus*, GstMessage* message, AppendPipeline* appendPipeline) {
127         appendPipeline->handleErrorSyncMessage(message);
128     }), this);
129     g_signal_connect(m_bus.get(), "sync-message::need-context", G_CALLBACK(+[](GstBus*, GstMessage* message, AppendPipeline* appendPipeline) {
130         appendPipeline->handleNeedContextSyncMessage(message);
131     }), this);
132     g_signal_connect(m_bus.get(), "message::state-changed", G_CALLBACK(+[](GstBus*, GstMessage* message, AppendPipeline* appendPipeline) {
133         appendPipeline->handleStateChangeMessage(message);
134     }), this);
135
136     // We assign the created instances here instead of adoptRef() because gst_bin_add_many()
137     // below will already take the initial reference and we need an additional one for us.
138     m_appsrc = gst_element_factory_make("appsrc", nullptr);
139
140     GRefPtr<GstPad> appsrcPad = adoptGRef(gst_element_get_static_pad(m_appsrc.get(), "src"));
141     gst_pad_add_probe(appsrcPad.get(), GST_PAD_PROBE_TYPE_BUFFER, [](GstPad*, GstPadProbeInfo* padProbeInfo, void* userData) {
142         return static_cast<AppendPipeline*>(userData)->appsrcEndOfAppendCheckerProbe(padProbeInfo);
143     }, this, nullptr);
144
145     const String& type = m_sourceBufferPrivate->type().containerType();
146     GST_DEBUG("SourceBuffer containerType: %s", type.utf8().data());
147     if (type.endsWith("mp4") || type.endsWith("aac"))
148         m_demux = gst_element_factory_make("qtdemux", nullptr);
149     else if (type.endsWith("webm"))
150         m_demux = gst_element_factory_make("matroskademux", nullptr);
151     else
152         ASSERT_NOT_REACHED();
153
154     m_appsink = gst_element_factory_make("appsink", nullptr);
155
156     gst_app_sink_set_emit_signals(GST_APP_SINK(m_appsink.get()), TRUE);
157     gst_base_sink_set_sync(GST_BASE_SINK(m_appsink.get()), FALSE);
158     gst_base_sink_set_async_enabled(GST_BASE_SINK(m_appsink.get()), FALSE); // No prerolls, no async state changes.
159     gst_base_sink_set_drop_out_of_segment(GST_BASE_SINK(m_appsink.get()), FALSE);
160     gst_base_sink_set_last_sample_enabled(GST_BASE_SINK(m_appsink.get()), FALSE);
161
162     GRefPtr<GstPad> appsinkPad = adoptGRef(gst_element_get_static_pad(m_appsink.get(), "sink"));
163     g_signal_connect(appsinkPad.get(), "notify::caps", G_CALLBACK(+[](GObject*, GParamSpec*, AppendPipeline* appendPipeline) {
164         if (isMainThread()) {
165             // When changing the pipeline state down to READY the demuxer is unlinked and this triggers a caps notification
166             // because the appsink loses its previously negotiated caps. We are not interested in these unnegotiated caps.
167 #ifndef NDEBUG
168             GRefPtr<GstPad> pad = adoptGRef(gst_element_get_static_pad(appendPipeline->m_appsink.get(), "sink"));
169             GRefPtr<GstCaps> caps = adoptGRef(gst_pad_get_current_caps(pad.get()));
170             ASSERT(!caps);
171 #endif
172             return;
173         }
174
175         // The streaming thread has just received a new caps and is about to let samples using the
176         // new caps flow. Let's block it until the main thread has consumed the samples with the old
177         // caps and has processed the caps change.
178         appendPipeline->m_taskQueue.enqueueTaskAndWait<AbortableTaskQueue::Void>([appendPipeline]() {
179             appendPipeline->appsinkCapsChanged();
180             return AbortableTaskQueue::Void();
181         });
182     }), this);
183
184 #if !LOG_DISABLED
185     GRefPtr<GstPad> demuxerPad = adoptGRef(gst_element_get_static_pad(m_demux.get(), "sink"));
186     m_demuxerDataEnteringPadProbeInformation.appendPipeline = this;
187     m_demuxerDataEnteringPadProbeInformation.description = "demuxer data entering";
188     m_demuxerDataEnteringPadProbeInformation.probeId = gst_pad_add_probe(demuxerPad.get(), GST_PAD_PROBE_TYPE_BUFFER, reinterpret_cast<GstPadProbeCallback>(appendPipelinePadProbeDebugInformation), &m_demuxerDataEnteringPadProbeInformation, nullptr);
189     m_appsinkDataEnteringPadProbeInformation.appendPipeline = this;
190     m_appsinkDataEnteringPadProbeInformation.description = "appsink data entering";
191     m_appsinkDataEnteringPadProbeInformation.probeId = gst_pad_add_probe(appsinkPad.get(), GST_PAD_PROBE_TYPE_BUFFER, reinterpret_cast<GstPadProbeCallback>(appendPipelinePadProbeDebugInformation), &m_appsinkDataEnteringPadProbeInformation, nullptr);
192 #endif
193
194 #if ENABLE(ENCRYPTED_MEDIA)
195     m_appsinkPadEventProbeInformation.appendPipeline = this;
196     m_appsinkPadEventProbeInformation.description = "appsink event probe";
197     m_appsinkPadEventProbeInformation.probeId = gst_pad_add_probe(appsinkPad.get(), GST_PAD_PROBE_TYPE_EVENT_DOWNSTREAM, reinterpret_cast<GstPadProbeCallback>(appendPipelineAppsinkPadEventProbe), &m_appsinkPadEventProbeInformation, nullptr);
198 #endif
199
200     // These signals won't be connected outside of the lifetime of "this".
201     g_signal_connect(m_demux.get(), "pad-added", G_CALLBACK(+[](GstElement*, GstPad* demuxerSrcPad, AppendPipeline* appendPipeline) {
202         appendPipeline->connectDemuxerSrcPadToAppsinkFromStreamingThread(demuxerSrcPad);
203     }), this);
204     g_signal_connect(m_demux.get(), "pad-removed", G_CALLBACK(+[](GstElement*, GstPad* demuxerSrcPad, AppendPipeline* appendPipeline) {
205         appendPipeline->disconnectDemuxerSrcPadFromAppsinkFromAnyThread(demuxerSrcPad);
206     }), this);
207     g_signal_connect(m_demux.get(), "no-more-pads", G_CALLBACK(+[](GstElement*, AppendPipeline* appendPipeline) {
208         ASSERT(!isMainThread());
209         GST_DEBUG("Posting no-more-pads task to main thread");
210         appendPipeline->m_taskQueue.enqueueTask([appendPipeline]() {
211             appendPipeline->didReceiveInitializationSegment();
212         });
213     }), this);
214     g_signal_connect(m_appsink.get(), "new-sample", G_CALLBACK(+[](GstElement* appsink, AppendPipeline* appendPipeline) -> GstFlowReturn {
215         appendPipeline->handleAppsinkNewSampleFromStreamingThread(appsink);
216         return GST_FLOW_OK;
217     }), this);
218     g_signal_connect(m_appsink.get(), "eos", G_CALLBACK(+[](GstElement*, AppendPipeline* appendPipeline) {
219         // basesrc will emit an EOS after it has received a GST_FLOW_ERROR. That's the only case we are expecting.
220         if (!appendPipeline->m_errorReceived) {
221             GST_ERROR("Unexpected appsink EOS in AppendPipeline");
222             ASSERT_NOT_REACHED();
223         }
224     }), this);
225
226     // Add_many will take ownership of a reference. That's why we used an assignment before.
227     gst_bin_add_many(GST_BIN(m_pipeline.get()), m_appsrc.get(), m_demux.get(), nullptr);
228     gst_element_link(m_appsrc.get(), m_demux.get());
229
230     assertedElementSetState(m_pipeline.get(), GST_STATE_PLAYING);
231 }
232
233 AppendPipeline::~AppendPipeline()
234 {
235     GST_DEBUG_OBJECT(m_pipeline.get(), "Destructing AppendPipeline (%p)", this);
236     ASSERT(isMainThread());
237
238     // Forget all pending tasks and unblock the streaming thread if it was blocked.
239     m_taskQueue.startAborting();
240
241     // Disconnect all synchronous event handlers and probes susceptible of firing from the main thread
242     // when changing the pipeline state.
243
244     if (m_pipeline) {
245         ASSERT(m_bus);
246         g_signal_handlers_disconnect_by_data(m_bus.get(), this);
247         gst_bus_disable_sync_message_emission(m_bus.get());
248         gst_bus_remove_signal_watch(m_bus.get());
249     }
250
251     if (m_appsrc)
252         g_signal_handlers_disconnect_by_data(m_appsrc.get(), this);
253
254     if (m_demux) {
255 #if !LOG_DISABLED
256         GRefPtr<GstPad> demuxerPad = adoptGRef(gst_element_get_static_pad(m_demux.get(), "sink"));
257         gst_pad_remove_probe(demuxerPad.get(), m_demuxerDataEnteringPadProbeInformation.probeId);
258 #endif
259
260         g_signal_handlers_disconnect_by_data(m_demux.get(), this);
261     }
262
263     if (m_appsink) {
264         GRefPtr<GstPad> appsinkPad = adoptGRef(gst_element_get_static_pad(m_appsink.get(), "sink"));
265         g_signal_handlers_disconnect_by_data(appsinkPad.get(), this);
266         g_signal_handlers_disconnect_by_data(m_appsink.get(), this);
267
268 #if !LOG_DISABLED
269         gst_pad_remove_probe(appsinkPad.get(), m_appsinkDataEnteringPadProbeInformation.probeId);
270 #endif
271
272 #if ENABLE(ENCRYPTED_MEDIA)
273         gst_pad_remove_probe(appsinkPad.get(), m_appsinkPadEventProbeInformation.probeId);
274 #endif
275     }
276
277     // We can tear down the pipeline safely now.
278     if (m_pipeline)
279         gst_element_set_state(m_pipeline.get(), GST_STATE_NULL);
280 }
281
282 void AppendPipeline::handleErrorSyncMessage(GstMessage* message)
283 {
284     ASSERT(!isMainThread());
285     GST_WARNING_OBJECT(m_pipeline.get(), "Demuxing error: %" GST_PTR_FORMAT, message);
286     // Notify the main thread that the append has a decode error.
287     auto response = m_taskQueue.enqueueTaskAndWait<AbortableTaskQueue::Void>([this]() {
288         m_errorReceived = true;
289         // appendParsingFailed() will cause resetParserState() to be called.
290         m_sourceBufferPrivate->appendParsingFailed();
291         return AbortableTaskQueue::Void();
292     });
293     // The streaming thread has now been unblocked because we are aborting in the main thread.
294     ASSERT(!response);
295 }
296
297 GstPadProbeReturn AppendPipeline::appsrcEndOfAppendCheckerProbe(GstPadProbeInfo* padProbeInfo)
298 {
299     ASSERT(!isMainThread());
300     m_streamingThread = &WTF::Thread::current();
301
302     GstBuffer* buffer = GST_BUFFER(padProbeInfo->data);
303     ASSERT(GST_IS_BUFFER(buffer));
304
305     GST_TRACE_OBJECT(m_pipeline.get(), "Buffer entered appsrcEndOfAppendCheckerProbe: %" GST_PTR_FORMAT, buffer);
306
307     EndOfAppendMeta* endOfAppendMeta = reinterpret_cast<EndOfAppendMeta*>(gst_buffer_get_meta(buffer, s_endOfAppendMetaType));
308     if (!endOfAppendMeta) {
309         // Normal buffer, nothing to do.
310         return GST_PAD_PROBE_OK;
311     }
312
313     GST_TRACE_OBJECT(m_pipeline.get(), "Posting end-of-append task to the main thread");
314     m_taskQueue.enqueueTask([this]() {
315         handleEndOfAppend();
316     });
317     return GST_PAD_PROBE_DROP;
318 }
319
320 void AppendPipeline::handleNeedContextSyncMessage(GstMessage* message)
321 {
322     const gchar* contextType = nullptr;
323     gst_message_parse_context_type(message, &contextType);
324     GST_TRACE("context type: %s", contextType);
325
326     // MediaPlayerPrivateGStreamerBase will take care of setting up encryption.
327     m_playerPrivate->handleSyncMessage(message);
328 }
329
330 void AppendPipeline::handleStateChangeMessage(GstMessage* message)
331 {
332     ASSERT(isMainThread());
333
334     if (GST_MESSAGE_SRC(message) == reinterpret_cast<GstObject*>(m_pipeline.get())) {
335         GstState currentState, newState;
336         gst_message_parse_state_changed(message, &currentState, &newState, nullptr);
337         CString sourceBufferType = String(m_sourceBufferPrivate->type().raw())
338             .replace("/", "_").replace(" ", "_")
339             .replace("\"", "").replace("\'", "").utf8();
340         CString dotFileName = makeString("webkit-append-",
341             sourceBufferType.data(), '-',
342             gst_element_state_get_name(currentState), '_',
343             gst_element_state_get_name(newState)).utf8();
344         GST_DEBUG_BIN_TO_DOT_FILE_WITH_TS(GST_BIN(m_pipeline.get()), GST_DEBUG_GRAPH_SHOW_ALL, dotFileName.data());
345     }
346 }
347
348 gint AppendPipeline::id()
349 {
350     ASSERT(isMainThread());
351
352     if (m_id)
353         return m_id;
354
355     static gint s_totalAudio = 0;
356     static gint s_totalVideo = 0;
357     static gint s_totalText = 0;
358
359     switch (m_streamType) {
360     case Audio:
361         m_id = ++s_totalAudio;
362         break;
363     case Video:
364         m_id = ++s_totalVideo;
365         break;
366     case Text:
367         m_id = ++s_totalText;
368         break;
369     case Unknown:
370     case Invalid:
371         GST_ERROR("Trying to get id for a pipeline of Unknown/Invalid type");
372         ASSERT_NOT_REACHED();
373         break;
374     }
375
376     GST_DEBUG("streamType=%d, id=%d", static_cast<int>(m_streamType), m_id);
377
378     return m_id;
379 }
380
381 void AppendPipeline::parseDemuxerSrcPadCaps(GstCaps* demuxerSrcPadCaps)
382 {
383     ASSERT(isMainThread());
384
385     m_demuxerSrcPadCaps = adoptGRef(demuxerSrcPadCaps);
386     m_streamType = WebCore::MediaSourceStreamTypeGStreamer::Unknown;
387
388     const char* originalMediaType = capsMediaType(m_demuxerSrcPadCaps.get());
389     auto& gstRegistryScanner = GStreamerRegistryScannerMSE::singleton();
390     if (!gstRegistryScanner.isCodecSupported(originalMediaType)) {
391             m_presentationSize = WebCore::FloatSize();
392             m_streamType = WebCore::MediaSourceStreamTypeGStreamer::Invalid;
393     } else if (doCapsHaveType(m_demuxerSrcPadCaps.get(), GST_VIDEO_CAPS_TYPE_PREFIX)) {
394         Optional<FloatSize> size = getVideoResolutionFromCaps(m_demuxerSrcPadCaps.get());
395         if (size.hasValue())
396             m_presentationSize = size.value();
397         else
398             m_presentationSize = WebCore::FloatSize();
399
400         m_streamType = WebCore::MediaSourceStreamTypeGStreamer::Video;
401     } else {
402         m_presentationSize = WebCore::FloatSize();
403         if (doCapsHaveType(m_demuxerSrcPadCaps.get(), GST_AUDIO_CAPS_TYPE_PREFIX))
404             m_streamType = WebCore::MediaSourceStreamTypeGStreamer::Audio;
405         else if (doCapsHaveType(m_demuxerSrcPadCaps.get(), GST_TEXT_CAPS_TYPE_PREFIX))
406             m_streamType = WebCore::MediaSourceStreamTypeGStreamer::Text;
407     }
408 }
409
410 void AppendPipeline::appsinkCapsChanged()
411 {
412     ASSERT(isMainThread());
413
414     // Consume any pending samples with the previous caps.
415     consumeAppsinkAvailableSamples();
416
417     GRefPtr<GstPad> pad = adoptGRef(gst_element_get_static_pad(m_appsink.get(), "sink"));
418     GRefPtr<GstCaps> caps = adoptGRef(gst_pad_get_current_caps(pad.get()));
419
420     if (!caps)
421         return;
422
423     if (doCapsHaveType(caps.get(), GST_VIDEO_CAPS_TYPE_PREFIX)) {
424         Optional<FloatSize> size = getVideoResolutionFromCaps(caps.get());
425         if (size.hasValue())
426             m_presentationSize = size.value();
427     }
428
429     // This means that we're right after a new track has appeared. Otherwise, it's a caps change inside the same track.
430     bool previousCapsWereNull = !m_appsinkCaps;
431
432     if (m_appsinkCaps != caps) {
433         m_appsinkCaps = WTFMove(caps);
434         m_playerPrivate->trackDetected(this, m_track, previousCapsWereNull);
435     }
436 }
437
438 void AppendPipeline::handleEndOfAppend()
439 {
440     ASSERT(isMainThread());
441     consumeAppsinkAvailableSamples();
442     GST_TRACE_OBJECT(m_pipeline.get(), "Notifying SourceBufferPrivate the append is complete");
443     sourceBufferPrivate()->didReceiveAllPendingSamples();
444 }
445
446 void AppendPipeline::appsinkNewSample(GRefPtr<GstSample>&& sample)
447 {
448     ASSERT(isMainThread());
449
450     if (UNLIKELY(!gst_sample_get_buffer(sample.get()))) {
451         GST_WARNING("Received sample without buffer from appsink.");
452         return;
453     }
454
455     auto mediaSample = WebCore::MediaSampleGStreamer::create(WTFMove(sample), m_presentationSize, trackId());
456
457     GST_TRACE("append: trackId=%s PTS=%s DTS=%s DUR=%s presentationSize=%.0fx%.0f",
458         mediaSample->trackID().string().utf8().data(),
459         mediaSample->presentationTime().toString().utf8().data(),
460         mediaSample->decodeTime().toString().utf8().data(),
461         mediaSample->duration().toString().utf8().data(),
462         mediaSample->presentationSize().width(), mediaSample->presentationSize().height());
463
464     // If we're beyond the duration, ignore this sample.
465     MediaTime duration = m_mediaSourceClient->duration();
466     if (duration.isValid() && !duration.indefiniteTime() && mediaSample->presentationTime() > duration) {
467         GST_DEBUG_OBJECT(m_pipeline.get(), "Detected sample (%s) beyond the duration (%s), discarding", mediaSample->presentationTime().toString().utf8().data(), duration.toString().utf8().data());
468         return;
469     }
470
471     // Add a gap sample if a gap is detected before the first sample.
472     if (mediaSample->decodeTime() == MediaTime::zeroTime() && mediaSample->presentationTime() > MediaTime::zeroTime() && mediaSample->presentationTime() <= MediaTime(1, 10)) {
473         GST_DEBUG("Adding gap offset");
474         mediaSample->applyPtsOffset(MediaTime::zeroTime());
475     }
476
477     m_sourceBufferPrivate->didReceiveSample(mediaSample.get());
478 }
479
480 void AppendPipeline::didReceiveInitializationSegment()
481 {
482     ASSERT(isMainThread());
483
484     WebCore::SourceBufferPrivateClient::InitializationSegment initializationSegment;
485
486     GST_DEBUG("Notifying SourceBuffer for track %s", (m_track) ? m_track->id().string().utf8().data() : nullptr);
487     initializationSegment.duration = m_mediaSourceClient->duration();
488
489     switch (m_streamType) {
490     case Audio: {
491         WebCore::SourceBufferPrivateClient::InitializationSegment::AudioTrackInformation info;
492         info.track = static_cast<AudioTrackPrivateGStreamer*>(m_track.get());
493         info.description = WebCore::GStreamerMediaDescription::create(m_demuxerSrcPadCaps.get());
494         initializationSegment.audioTracks.append(info);
495         break;
496     }
497     case Video: {
498         WebCore::SourceBufferPrivateClient::InitializationSegment::VideoTrackInformation info;
499         info.track = static_cast<VideoTrackPrivateGStreamer*>(m_track.get());
500         info.description = WebCore::GStreamerMediaDescription::create(m_demuxerSrcPadCaps.get());
501         initializationSegment.videoTracks.append(info);
502         break;
503     }
504     default:
505         GST_ERROR("Unsupported stream type or codec");
506         break;
507     }
508
509     m_sourceBufferPrivate->didReceiveInitializationSegment(initializationSegment);
510 }
511
512 AtomString AppendPipeline::trackId()
513 {
514     ASSERT(isMainThread());
515
516     if (!m_track)
517         return AtomString();
518
519     return m_track->id();
520 }
521
522 void AppendPipeline::consumeAppsinkAvailableSamples()
523 {
524     ASSERT(isMainThread());
525
526     GRefPtr<GstSample> sample;
527     int batchedSampleCount = 0;
528     // In some cases each frame increases the duration of the movie.
529     // Batch duration changes so that if we pick 100 of such samples we don't have to run 100 times
530     // layout for the video controls, but only once.
531     m_playerPrivate->blockDurationChanges();
532     while ((sample = adoptGRef(gst_app_sink_try_pull_sample(GST_APP_SINK(m_appsink.get()), 0)))) {
533         appsinkNewSample(WTFMove(sample));
534         batchedSampleCount++;
535     }
536     m_playerPrivate->unblockDurationChanges();
537
538     GST_TRACE_OBJECT(m_pipeline.get(), "batchedSampleCount = %d", batchedSampleCount);
539 }
540
541 void AppendPipeline::resetParserState()
542 {
543     ASSERT(isMainThread());
544     GST_DEBUG_OBJECT(m_pipeline.get(), "Handling resetParserState() in AppendPipeline by resetting the pipeline");
545
546     // FIXME: Implement a flush event-based resetParserState() implementation would allow the initialization segment to
547     // survive, in accordance with the spec.
548
549     // This function restores the GStreamer pipeline to the same state it was when the AppendPipeline constructor
550     // finished. All previously enqueued data is lost and the demuxer is reset, losing all pads and track data.
551
552     // Unlock the streaming thread.
553     m_taskQueue.startAborting();
554
555     // Reset the state of all elements in the pipeline.
556     assertedElementSetState(m_pipeline.get(), GST_STATE_READY);
557
558     // The parser is tear down automatically when the demuxer is reset (see disconnectDemuxerSrcPadFromAppsinkFromAnyThread()).
559     ASSERT(!m_parser);
560
561     // Set the pipeline to PLAYING so that it can be used again.
562     assertedElementSetState(m_pipeline.get(), GST_STATE_PLAYING);
563
564     // All processing related to the previous append has been aborted and the pipeline is idle.
565     // We can listen again to new requests coming from the streaming thread.
566     m_taskQueue.finishAborting();
567
568 #if (!(LOG_DISABLED || defined(GST_DISABLE_GST_DEBUG)))
569     {
570         static unsigned i = 0;
571         // This is here for debugging purposes. It does not make sense to have it as class member.
572         WTF::String dotFileName = makeString("reset-pipeline-", ++i);
573         gst_debug_bin_to_dot_file(GST_BIN(m_pipeline.get()), GST_DEBUG_GRAPH_SHOW_ALL, dotFileName.utf8().data());
574     }
575 #endif
576 }
577
578 void AppendPipeline::pushNewBuffer(GRefPtr<GstBuffer>&& buffer)
579 {
580     GST_TRACE_OBJECT(m_pipeline.get(), "pushing data buffer %" GST_PTR_FORMAT, buffer.get());
581     GstFlowReturn pushDataBufferRet = gst_app_src_push_buffer(GST_APP_SRC(m_appsrc.get()), buffer.leakRef());
582     // Pushing buffers to appsrc can only fail if the appsrc is flushing, in EOS or stopped. Neither of these should
583     // be true at this point.
584     if (pushDataBufferRet != GST_FLOW_OK) {
585         GST_ERROR_OBJECT(m_pipeline.get(), "Failed to push data buffer into appsrc.");
586         ASSERT_NOT_REACHED();
587     }
588
589     // Push an additional empty buffer that marks the end of the append.
590     // This buffer is detected and consumed by appsrcEndOfAppendCheckerProbe(), which uses it to signal the successful
591     // completion of the append.
592     //
593     // This works based on how push mode scheduling works in GStreamer. Note there is a single streaming thread for the
594     // AppendPipeline, and within a stream (the portion of a pipeline covered by the same streaming thread, in this case
595     // the whole pipeline) a buffer is guaranteed not to be processed by downstream until processing of the previous
596     // buffer has completed.
597
598     GstBuffer* endOfAppendBuffer = gst_buffer_new();
599     gst_buffer_add_meta(endOfAppendBuffer, s_webKitEndOfAppendMetaInfo, nullptr);
600
601     GST_TRACE_OBJECT(m_pipeline.get(), "pushing end-of-append buffer %" GST_PTR_FORMAT, endOfAppendBuffer);
602     GstFlowReturn pushEndOfAppendBufferRet = gst_app_src_push_buffer(GST_APP_SRC(m_appsrc.get()), endOfAppendBuffer);
603     if (pushEndOfAppendBufferRet != GST_FLOW_OK) {
604         GST_ERROR_OBJECT(m_pipeline.get(), "Failed to push end-of-append buffer into appsrc.");
605         ASSERT_NOT_REACHED();
606     }
607 }
608
609 void AppendPipeline::handleAppsinkNewSampleFromStreamingThread(GstElement*)
610 {
611     ASSERT(!isMainThread());
612     if (&WTF::Thread::current() != m_streamingThread) {
613         // m_streamingThreadId has been initialized in appsrcEndOfAppendCheckerProbe().
614         // For a buffer to reach the appsink, a buffer must have passed through appsrcEndOfAppendCheckerProbe() first.
615         // This error will only raise if someone modifies the pipeline to include more than one streaming thread or
616         // removes the appsrcEndOfAppendCheckerProbe(). Either way, the end-of-append detection would be broken.
617         // AppendPipeline should have only one streaming thread. Otherwise we can't detect reliably when an appends has
618         // been demuxed completely.;
619         GST_ERROR_OBJECT(m_pipeline.get(), "Appsink received a sample in a different thread than appsrcEndOfAppendCheckerProbe run.");
620         ASSERT_NOT_REACHED();
621     }
622
623     if (!m_wasBusAlreadyNotifiedOfAvailableSamples.test_and_set()) {
624         GST_TRACE("Posting appsink-new-sample task to the main thread");
625         m_taskQueue.enqueueTask([this]() {
626             m_wasBusAlreadyNotifiedOfAvailableSamples.clear();
627             consumeAppsinkAvailableSamples();
628         });
629     }
630 }
631
632 static GRefPtr<GstElement>
633 createOptionalParserForFormat(GstPad* demuxerSrcPad)
634 {
635     GRefPtr<GstCaps> padCaps = adoptGRef(gst_pad_get_current_caps(demuxerSrcPad));
636     GstStructure* structure = gst_caps_get_structure(padCaps.get(), 0);
637     const char* mediaType = gst_structure_get_name(structure);
638
639     GUniquePtr<char> demuxerPadName(gst_pad_get_name(demuxerSrcPad));
640     GUniquePtr<char> parserName(g_strdup_printf("%s_parser", demuxerPadName.get()));
641
642     if (!g_strcmp0(mediaType, "audio/x-opus")) {
643         GstElement* opusparse = gst_element_factory_make("opusparse", parserName.get());
644         ASSERT(opusparse);
645         g_return_val_if_fail(opusparse, nullptr);
646         return GRefPtr<GstElement>(opusparse);
647     }
648     if (!g_strcmp0(mediaType, "audio/x-vorbis")) {
649         GstElement* vorbisparse = gst_element_factory_make("vorbisparse", parserName.get());
650         ASSERT(vorbisparse);
651         g_return_val_if_fail(vorbisparse, nullptr);
652         return GRefPtr<GstElement>(vorbisparse);
653     }
654     if (!g_strcmp0(mediaType, "video/x-h264")) {
655         GstElement* h264parse = gst_element_factory_make("h264parse", parserName.get());
656         ASSERT(h264parse);
657         g_return_val_if_fail(h264parse, nullptr);
658         return GRefPtr<GstElement>(h264parse);
659     }
660
661     return nullptr;
662 }
663
664 void AppendPipeline::connectDemuxerSrcPadToAppsinkFromStreamingThread(GstPad* demuxerSrcPad)
665 {
666     ASSERT(!isMainThread());
667
668     GST_DEBUG("connecting to appsink");
669
670     if (m_demux->numsrcpads > 1) {
671         GST_WARNING("Only one stream per SourceBuffer is allowed! Ignoring stream %d by adding a black hole probe.", m_demux->numsrcpads);
672         gulong probeId = gst_pad_add_probe(demuxerSrcPad, GST_PAD_PROBE_TYPE_BUFFER, reinterpret_cast<GstPadProbeCallback>(appendPipelineDemuxerBlackHolePadProbe), nullptr, nullptr);
673         g_object_set_data(G_OBJECT(demuxerSrcPad), "blackHoleProbeId", GULONG_TO_POINTER(probeId));
674         return;
675     }
676
677     GRefPtr<GstPad> appsinkSinkPad = adoptGRef(gst_element_get_static_pad(m_appsink.get(), "sink"));
678
679     // Only one stream per demuxer is supported.
680     ASSERT(!gst_pad_is_linked(appsinkSinkPad.get()));
681
682     gint64 timeLength = 0;
683     if (gst_element_query_duration(m_demux.get(), GST_FORMAT_TIME, &timeLength)
684         && static_cast<guint64>(timeLength) != GST_CLOCK_TIME_NONE)
685         m_initialDuration = MediaTime(GST_TIME_AS_USECONDS(timeLength), G_USEC_PER_SEC);
686     else
687         m_initialDuration = MediaTime::positiveInfiniteTime();
688
689     GST_DEBUG("Requesting demuxer-connect-to-appsink to main thread");
690     auto response = m_taskQueue.enqueueTaskAndWait<AbortableTaskQueue::Void>([this, demuxerSrcPad]() {
691         connectDemuxerSrcPadToAppsink(demuxerSrcPad);
692         return AbortableTaskQueue::Void();
693     });
694     if (!response) {
695         // The AppendPipeline has been destroyed or aborted before we received a response.
696         return;
697     }
698
699     // Must be done in the thread we were called from (usually streaming thread).
700     bool isData = (m_streamType == WebCore::MediaSourceStreamTypeGStreamer::Audio)
701         || (m_streamType == WebCore::MediaSourceStreamTypeGStreamer::Video)
702         || (m_streamType == WebCore::MediaSourceStreamTypeGStreamer::Text);
703
704     if (isData) {
705         GRefPtr<GstObject> parent = adoptGRef(gst_element_get_parent(m_appsink.get()));
706         if (!parent)
707             gst_bin_add(GST_BIN(m_pipeline.get()), m_appsink.get());
708
709         // Current head of the pipeline being built.
710         GRefPtr<GstPad> currentSrcPad = demuxerSrcPad;
711
712         // Some audio files unhelpfully omit the duration of frames in the container. We need to parse
713         // the contained audio streams in order to know the duration of the frames.
714         // This is known to be an issue with YouTube WebM files containing Opus audio as of YTTV2018.
715         m_parser = createOptionalParserForFormat(currentSrcPad.get());
716         if (m_parser) {
717             gst_bin_add(GST_BIN(m_pipeline.get()), m_parser.get());
718             gst_element_sync_state_with_parent(m_parser.get());
719
720             GRefPtr<GstPad> parserSinkPad = adoptGRef(gst_element_get_static_pad(m_parser.get(), "sink"));
721             GRefPtr<GstPad> parserSrcPad = adoptGRef(gst_element_get_static_pad(m_parser.get(), "src"));
722
723             gst_pad_link(currentSrcPad.get(), parserSinkPad.get());
724             currentSrcPad = parserSrcPad;
725         }
726
727         gst_pad_link(currentSrcPad.get(), appsinkSinkPad.get());
728
729         gst_element_sync_state_with_parent(m_appsink.get());
730
731         GST_DEBUG_BIN_TO_DOT_FILE_WITH_TS(GST_BIN(m_pipeline.get()), GST_DEBUG_GRAPH_SHOW_ALL, "webkit-after-link");
732     }
733 }
734
735 void AppendPipeline::connectDemuxerSrcPadToAppsink(GstPad* demuxerSrcPad)
736 {
737     ASSERT(isMainThread());
738     GST_DEBUG("Connecting to appsink");
739
740     const String& type = m_sourceBufferPrivate->type().containerType();
741     if (type.endsWith("webm"))
742         gst_pad_add_probe(demuxerSrcPad, GST_PAD_PROBE_TYPE_EVENT_DOWNSTREAM, matroskademuxForceSegmentStartToEqualZero, nullptr, nullptr);
743
744     GRefPtr<GstPad> sinkSinkPad = adoptGRef(gst_element_get_static_pad(m_appsink.get(), "sink"));
745
746     // Only one stream per demuxer is supported.
747     ASSERT(!gst_pad_is_linked(sinkSinkPad.get()));
748
749     GRefPtr<GstCaps> caps = adoptGRef(gst_pad_get_current_caps(GST_PAD(demuxerSrcPad)));
750
751 #ifndef GST_DISABLE_GST_DEBUG
752     {
753         GUniquePtr<gchar> strcaps(gst_caps_to_string(caps.get()));
754         GST_DEBUG("%s", strcaps.get());
755     }
756 #endif
757
758     if (m_mediaSourceClient->duration().isInvalid() && m_initialDuration > MediaTime::zeroTime())
759         m_mediaSourceClient->durationChanged(m_initialDuration);
760
761     parseDemuxerSrcPadCaps(gst_caps_ref(caps.get()));
762
763     switch (m_streamType) {
764     case WebCore::MediaSourceStreamTypeGStreamer::Audio:
765         m_track = WebCore::AudioTrackPrivateGStreamer::create(makeWeakPtr(*m_playerPrivate), id(), sinkSinkPad.get());
766         break;
767     case WebCore::MediaSourceStreamTypeGStreamer::Video:
768         m_track = WebCore::VideoTrackPrivateGStreamer::create(makeWeakPtr(*m_playerPrivate), id(), sinkSinkPad.get());
769         break;
770     case WebCore::MediaSourceStreamTypeGStreamer::Text:
771         m_track = WebCore::InbandTextTrackPrivateGStreamer::create(id(), sinkSinkPad.get());
772         break;
773     case WebCore::MediaSourceStreamTypeGStreamer::Invalid:
774         GST_WARNING_OBJECT(m_pipeline.get(), "Unsupported track codec: %" GST_PTR_FORMAT, caps.get());
775         // 3.5.7 Initialization Segment Received
776         // 5.1. If the initialization segment contains tracks with codecs the user agent does not support, then run the
777         // append error algorithm and abort these steps.
778
779         // appendParsingFailed() will immediately cause a resetParserState() which will stop demuxing, then the
780         // AppendPipeline will be destroyed.
781         m_sourceBufferPrivate->appendParsingFailed();
782         return;
783     default:
784         GST_WARNING_OBJECT(m_pipeline.get(), "Pad has unknown track type, ignoring: %" GST_PTR_FORMAT, caps.get());
785         break;
786     }
787
788     m_appsinkCaps = WTFMove(caps);
789     m_playerPrivate->trackDetected(this, m_track, true);
790 }
791
792 void AppendPipeline::disconnectDemuxerSrcPadFromAppsinkFromAnyThread(GstPad*)
793 {
794     // Note: This function can be called either from the streaming thread (e.g. if a strange initialization segment with
795     // incompatible tracks is appended and the srcpad disconnected) or -- more usually -- from the main thread, when
796     // a state change is made to bring the demuxer down. (State change operations run in the main thread.)
797     GST_DEBUG_BIN_TO_DOT_FILE_WITH_TS(GST_BIN(m_pipeline.get()), GST_DEBUG_GRAPH_SHOW_ALL, "pad-removed-before");
798
799     GST_DEBUG("Disconnecting appsink");
800
801     if (m_parser) {
802         assertedElementSetState(m_parser.get(), GST_STATE_NULL);
803         gst_bin_remove(GST_BIN(m_pipeline.get()), m_parser.get());
804         m_parser = nullptr;
805     }
806
807     GST_DEBUG_BIN_TO_DOT_FILE_WITH_TS(GST_BIN(m_pipeline.get()), GST_DEBUG_GRAPH_SHOW_ALL, "pad-removed-after");
808 }
809
810 #if !LOG_DISABLED
811 static GstPadProbeReturn appendPipelinePadProbeDebugInformation(GstPad*, GstPadProbeInfo* info, struct PadProbeInformation* padProbeInformation)
812 {
813     ASSERT(GST_PAD_PROBE_INFO_TYPE(info) & GST_PAD_PROBE_TYPE_BUFFER);
814     GstBuffer* buffer = GST_PAD_PROBE_INFO_BUFFER(info);
815     GST_TRACE("%s: buffer of size %" G_GSIZE_FORMAT " going thru", padProbeInformation->description, gst_buffer_get_size(buffer));
816     return GST_PAD_PROBE_OK;
817 }
818 #endif
819
820 #if ENABLE(ENCRYPTED_MEDIA)
821 static GstPadProbeReturn appendPipelineAppsinkPadEventProbe(GstPad*, GstPadProbeInfo* info, struct PadProbeInformation *padProbeInformation)
822 {
823     ASSERT(GST_PAD_PROBE_INFO_TYPE(info) & GST_PAD_PROBE_TYPE_EVENT_DOWNSTREAM);
824     GstEvent* event = gst_pad_probe_info_get_event(info);
825     GST_DEBUG("Handling event %s on append pipeline appsinkPad", GST_EVENT_TYPE_NAME(event));
826     WebCore::AppendPipeline* appendPipeline = padProbeInformation->appendPipeline;
827
828     switch (GST_EVENT_TYPE(event)) {
829     case GST_EVENT_PROTECTION:
830         if (appendPipeline && appendPipeline->playerPrivate())
831             appendPipeline->playerPrivate()->handleProtectionEvent(event);
832         return GST_PAD_PROBE_DROP;
833     default:
834         break;
835     }
836
837     return GST_PAD_PROBE_OK;
838 }
839 #endif
840
841 static GstPadProbeReturn appendPipelineDemuxerBlackHolePadProbe(GstPad*, GstPadProbeInfo* info, gpointer)
842 {
843     ASSERT(GST_PAD_PROBE_INFO_TYPE(info) & GST_PAD_PROBE_TYPE_BUFFER);
844     GstBuffer* buffer = GST_PAD_PROBE_INFO_BUFFER(info);
845     GST_TRACE("buffer of size %" G_GSIZE_FORMAT " ignored", gst_buffer_get_size(buffer));
846     return GST_PAD_PROBE_DROP;
847 }
848
849 static GstPadProbeReturn matroskademuxForceSegmentStartToEqualZero(GstPad*, GstPadProbeInfo* info, void*)
850 {
851     // matroskademux sets GstSegment.start to the PTS of the first frame.
852     //
853     // This way in the unlikely case a user made a .mkv or .webm file where a certain portion of the movie is skipped
854     // (e.g. by concatenating a MSE initialization segment with any MSE media segment other than the first) and opened
855     // it with a regular player, playback would start immediately. GstSegment.duration is not modified in any case.
856     //
857     // Leaving the usefulness of that feature aside, the fact that it uses GstSegment.start is problematic for MSE.
858     // In MSE is not unusual to process unordered MSE media segments. In this case, a frame may have
859     // PTS <<< GstSegment.start and be discarded by downstream. This happens for instance in elements derived from
860     // audiobasefilter, such as opusparse.
861     //
862     // This probe remedies the problem by setting GstSegment.start to 0 in all cases, not only when the PTS of the first
863     // frame is zero.
864
865     ASSERT(info->type & GST_PAD_PROBE_TYPE_EVENT_DOWNSTREAM);
866     GstEvent* event = static_cast<GstEvent*>(info->data);
867     if (event->type == GST_EVENT_SEGMENT) {
868         GstSegment segment;
869         gst_event_copy_segment(event, &segment);
870
871         segment.start = 0;
872
873         GRefPtr<GstEvent> newEvent = adoptGRef(gst_event_new_segment(&segment));
874         gst_event_replace(reinterpret_cast<GstEvent**>(&info->data), newEvent.get());
875     }
876     return GST_PAD_PROBE_OK;
877 }
878
879 } // namespace WebCore.
880
881 #endif // USE(GSTREAMER)