Use Optional::hasValue() instead of Optional::has_value()
[WebKit-https.git] / Source / WebCore / platform / graphics / gstreamer / mse / AppendPipeline.cpp
1 /*
2  * Copyright (C) 2016, 2017 Metrological Group B.V.
3  * Copyright (C) 2016, 2017 Igalia S.L
4  *
5  * This library is free software; you can redistribute it and/or
6  * modify it under the terms of the GNU Library General Public
7  * License as published by the Free Software Foundation; either
8  * version 2 of the License, or (at your option) any later version.
9  *
10  * This library is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * Library General Public License for more details.
14  *
15  * You should have received a copy of the GNU Library General Public License
16  * aint with this library; see the file COPYING.LIB.  If not, write to
17  * the Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
18  * Boston, MA 02110-1301, USA.
19  */
20
21 #include "config.h"
22 #include "AppendPipeline.h"
23
24 #if ENABLE(VIDEO) && USE(GSTREAMER) && ENABLE(MEDIA_SOURCE)
25
26 #include "AudioTrackPrivateGStreamer.h"
27 #include "GStreamerCommon.h"
28 #include "GStreamerEMEUtilities.h"
29 #include "GStreamerMediaDescription.h"
30 #include "MediaSampleGStreamer.h"
31 #include "InbandTextTrackPrivateGStreamer.h"
32 #include "MediaDescription.h"
33 #include "SourceBufferPrivateGStreamer.h"
34 #include "VideoTrackPrivateGStreamer.h"
35 #include <functional>
36 #include <gst/app/gstappsink.h>
37 #include <gst/app/gstappsrc.h>
38 #include <gst/gst.h>
39 #include <gst/pbutils/pbutils.h>
40 #include <gst/video/video.h>
41 #include <wtf/Condition.h>
42 #include <wtf/glib/GLibUtilities.h>
43 #include <wtf/glib/RunLoopSourcePriority.h>
44
45 GST_DEBUG_CATEGORY_EXTERN(webkit_mse_debug);
46 #define GST_CAT_DEFAULT webkit_mse_debug
47
48 namespace WebCore {
49
50 GType AppendPipeline::s_endOfAppendMetaType = 0;
51 const GstMetaInfo* AppendPipeline::s_webKitEndOfAppendMetaInfo = nullptr;
52 std::once_flag AppendPipeline::s_staticInitializationFlag;
53
54 struct EndOfAppendMeta {
55     GstMeta base;
56     static gboolean init(GstMeta*, void*, GstBuffer*) { return TRUE; }
57     static gboolean transform(GstBuffer*, GstMeta*, GstBuffer*, GQuark, void*) { g_return_val_if_reached(FALSE); }
58     static void free(GstMeta*, GstBuffer*) { }
59 };
60
61 void AppendPipeline::staticInitialization()
62 {
63     ASSERT(isMainThread());
64
65     const char* tags[] = { nullptr };
66     s_endOfAppendMetaType = gst_meta_api_type_register("WebKitEndOfAppendMetaAPI", tags);
67     s_webKitEndOfAppendMetaInfo = gst_meta_register(s_endOfAppendMetaType, "WebKitEndOfAppendMeta", sizeof(EndOfAppendMeta), EndOfAppendMeta::init, EndOfAppendMeta::free, EndOfAppendMeta::transform);
68 }
69
70 #if !LOG_DISABLED
71 static GstPadProbeReturn appendPipelinePadProbeDebugInformation(GstPad*, GstPadProbeInfo*, struct PadProbeInformation*);
72 #endif
73
74 #if ENABLE(ENCRYPTED_MEDIA)
75 static GstPadProbeReturn appendPipelineAppsinkPadEventProbe(GstPad*, GstPadProbeInfo*, struct PadProbeInformation*);
76 #endif
77
78 static GstPadProbeReturn appendPipelineDemuxerBlackHolePadProbe(GstPad*, GstPadProbeInfo*, gpointer);
79
80 static GstPadProbeReturn matroskademuxForceSegmentStartToEqualZero(GstPad*, GstPadProbeInfo*, void*);
81
82 // Wrapper for gst_element_set_state() that emits a critical if the state change fails or is not synchronous.
83 static void assertedElementSetState(GstElement* element, GstState desiredState)
84 {
85     GstState oldState;
86     gst_element_get_state(element, &oldState, nullptr, 0);
87
88     GstStateChangeReturn result = gst_element_set_state(element, desiredState);
89
90     GstState newState;
91     gst_element_get_state(element, &newState, nullptr, 0);
92
93     if (desiredState != newState || result != GST_STATE_CHANGE_SUCCESS) {
94         GST_ERROR("AppendPipeline state change failed (returned %d): %" GST_PTR_FORMAT " %d -> %d (expected %d)",
95             static_cast<int>(result), element, static_cast<int>(oldState), static_cast<int>(newState), static_cast<int>(desiredState));
96         ASSERT_NOT_REACHED();
97     }
98 }
99
100 AppendPipeline::AppendPipeline(Ref<MediaSourceClientGStreamerMSE> mediaSourceClient, Ref<SourceBufferPrivateGStreamer> sourceBufferPrivate, MediaPlayerPrivateGStreamerMSE& playerPrivate)
101     : m_mediaSourceClient(mediaSourceClient.get())
102     , m_sourceBufferPrivate(sourceBufferPrivate.get())
103     , m_playerPrivate(&playerPrivate)
104     , m_id(0)
105     , m_wasBusAlreadyNotifiedOfAvailableSamples(false)
106     , m_streamType(Unknown)
107 {
108     ASSERT(isMainThread());
109     std::call_once(s_staticInitializationFlag, AppendPipeline::staticInitialization);
110
111     GST_TRACE("Creating AppendPipeline (%p)", this);
112
113     // FIXME: give a name to the pipeline, maybe related with the track it's managing.
114     // The track name is still unknown at this time, though.
115     static size_t appendPipelineCount = 0;
116     String pipelineName = String::format("append-pipeline-%s-%zu",
117         m_sourceBufferPrivate->type().containerType().replace("/", "-").utf8().data(), appendPipelineCount++);
118     m_pipeline = gst_pipeline_new(pipelineName.utf8().data());
119
120     m_bus = adoptGRef(gst_pipeline_get_bus(GST_PIPELINE(m_pipeline.get())));
121     gst_bus_add_signal_watch_full(m_bus.get(), RunLoopSourcePriority::RunLoopDispatcher);
122     gst_bus_enable_sync_message_emission(m_bus.get());
123
124     g_signal_connect(m_bus.get(), "sync-message::error", G_CALLBACK(+[](GstBus*, GstMessage* message, AppendPipeline* appendPipeline) {
125         appendPipeline->handleErrorSyncMessage(message);
126     }), this);
127     g_signal_connect(m_bus.get(), "sync-message::need-context", G_CALLBACK(+[](GstBus*, GstMessage* message, AppendPipeline* appendPipeline) {
128         appendPipeline->handleNeedContextSyncMessage(message);
129     }), this);
130     g_signal_connect(m_bus.get(), "message::state-changed", G_CALLBACK(+[](GstBus*, GstMessage* message, AppendPipeline* appendPipeline) {
131         appendPipeline->handleStateChangeMessage(message);
132     }), this);
133
134     // We assign the created instances here instead of adoptRef() because gst_bin_add_many()
135     // below will already take the initial reference and we need an additional one for us.
136     m_appsrc = gst_element_factory_make("appsrc", nullptr);
137
138     GRefPtr<GstPad> appsrcPad = adoptGRef(gst_element_get_static_pad(m_appsrc.get(), "src"));
139     gst_pad_add_probe(appsrcPad.get(), GST_PAD_PROBE_TYPE_BUFFER, [](GstPad*, GstPadProbeInfo* padProbeInfo, void* userData) {
140         return static_cast<AppendPipeline*>(userData)->appsrcEndOfAppendCheckerProbe(padProbeInfo);
141     }, this, nullptr);
142
143     const String& type = m_sourceBufferPrivate->type().containerType();
144     if (type.endsWith("mp4"))
145         m_demux = gst_element_factory_make("qtdemux", nullptr);
146     else if (type.endsWith("webm"))
147         m_demux = gst_element_factory_make("matroskademux", nullptr);
148     else
149         ASSERT_NOT_REACHED();
150
151     m_appsink = gst_element_factory_make("appsink", nullptr);
152
153     gst_app_sink_set_emit_signals(GST_APP_SINK(m_appsink.get()), TRUE);
154     gst_base_sink_set_sync(GST_BASE_SINK(m_appsink.get()), FALSE);
155     gst_base_sink_set_async_enabled(GST_BASE_SINK(m_appsink.get()), FALSE); // No prerolls, no async state changes.
156     gst_base_sink_set_drop_out_of_segment(GST_BASE_SINK(m_appsink.get()), FALSE);
157     gst_base_sink_set_last_sample_enabled(GST_BASE_SINK(m_appsink.get()), FALSE);
158
159     GRefPtr<GstPad> appsinkPad = adoptGRef(gst_element_get_static_pad(m_appsink.get(), "sink"));
160     g_signal_connect(appsinkPad.get(), "notify::caps", G_CALLBACK(+[](GObject*, GParamSpec*, AppendPipeline* appendPipeline) {
161         if (isMainThread()) {
162             // When changing the pipeline state down to READY the demuxer is unlinked and this triggers a caps notification
163             // because the appsink loses its previously negotiated caps. We are not interested in these unnegotiated caps.
164 #ifndef NDEBUG
165             GRefPtr<GstPad> pad = adoptGRef(gst_element_get_static_pad(appendPipeline->m_appsink.get(), "sink"));
166             GRefPtr<GstCaps> caps = adoptGRef(gst_pad_get_current_caps(pad.get()));
167             ASSERT(!caps);
168 #endif
169             return;
170         }
171
172         appendPipeline->m_taskQueue.enqueueTask([appendPipeline]() {
173             appendPipeline->appsinkCapsChanged();
174         });
175     }), this);
176
177 #if !LOG_DISABLED
178     GRefPtr<GstPad> demuxerPad = adoptGRef(gst_element_get_static_pad(m_demux.get(), "sink"));
179     m_demuxerDataEnteringPadProbeInformation.appendPipeline = this;
180     m_demuxerDataEnteringPadProbeInformation.description = "demuxer data entering";
181     m_demuxerDataEnteringPadProbeInformation.probeId = gst_pad_add_probe(demuxerPad.get(), GST_PAD_PROBE_TYPE_BUFFER, reinterpret_cast<GstPadProbeCallback>(appendPipelinePadProbeDebugInformation), &m_demuxerDataEnteringPadProbeInformation, nullptr);
182     m_appsinkDataEnteringPadProbeInformation.appendPipeline = this;
183     m_appsinkDataEnteringPadProbeInformation.description = "appsink data entering";
184     m_appsinkDataEnteringPadProbeInformation.probeId = gst_pad_add_probe(appsinkPad.get(), GST_PAD_PROBE_TYPE_BUFFER, reinterpret_cast<GstPadProbeCallback>(appendPipelinePadProbeDebugInformation), &m_appsinkDataEnteringPadProbeInformation, nullptr);
185 #endif
186
187 #if ENABLE(ENCRYPTED_MEDIA)
188     m_appsinkPadEventProbeInformation.appendPipeline = this;
189     m_appsinkPadEventProbeInformation.description = "appsink event probe";
190     m_appsinkPadEventProbeInformation.probeId = gst_pad_add_probe(appsinkPad.get(), GST_PAD_PROBE_TYPE_EVENT_DOWNSTREAM, reinterpret_cast<GstPadProbeCallback>(appendPipelineAppsinkPadEventProbe), &m_appsinkPadEventProbeInformation, nullptr);
191 #endif
192
193     // These signals won't be connected outside of the lifetime of "this".
194     g_signal_connect(m_demux.get(), "pad-added", G_CALLBACK(+[](GstElement*, GstPad* demuxerSrcPad, AppendPipeline* appendPipeline) {
195         appendPipeline->connectDemuxerSrcPadToAppsinkFromStreamingThread(demuxerSrcPad);
196     }), this);
197     g_signal_connect(m_demux.get(), "pad-removed", G_CALLBACK(+[](GstElement*, GstPad* demuxerSrcPad, AppendPipeline* appendPipeline) {
198         appendPipeline->disconnectDemuxerSrcPadFromAppsinkFromAnyThread(demuxerSrcPad);
199     }), this);
200     g_signal_connect(m_demux.get(), "no-more-pads", G_CALLBACK(+[](GstElement*, AppendPipeline* appendPipeline) {
201         ASSERT(!isMainThread());
202         GST_DEBUG("Posting no-more-pads task to main thread");
203         appendPipeline->m_taskQueue.enqueueTask([appendPipeline]() {
204             appendPipeline->didReceiveInitializationSegment();
205         });
206     }), this);
207     g_signal_connect(m_appsink.get(), "new-sample", G_CALLBACK(+[](GstElement* appsink, AppendPipeline* appendPipeline) {
208         appendPipeline->handleAppsinkNewSampleFromStreamingThread(appsink);
209     }), this);
210     g_signal_connect(m_appsink.get(), "eos", G_CALLBACK(+[](GstElement*, AppendPipeline* appendPipeline) {
211         // basesrc will emit an EOS after it has received a GST_FLOW_ERROR. That's the only case we are expecting.
212         if (!appendPipeline->m_errorReceived) {
213             GST_ERROR("Unexpected appsink EOS in AppendPipeline");
214             ASSERT_NOT_REACHED();
215         }
216     }), this);
217
218     // Add_many will take ownership of a reference. That's why we used an assignment before.
219     gst_bin_add_many(GST_BIN(m_pipeline.get()), m_appsrc.get(), m_demux.get(), nullptr);
220     gst_element_link(m_appsrc.get(), m_demux.get());
221
222     assertedElementSetState(m_pipeline.get(), GST_STATE_PLAYING);
223 }
224
225 AppendPipeline::~AppendPipeline()
226 {
227     GST_DEBUG_OBJECT(m_pipeline.get(), "Destructing AppendPipeline (%p)", this);
228     ASSERT(isMainThread());
229
230     // Forget all pending tasks and unblock the streaming thread if it was blocked.
231     m_taskQueue.startAborting();
232
233     // Disconnect all synchronous event handlers and probes susceptible of firing from the main thread
234     // when changing the pipeline state.
235
236     if (m_pipeline) {
237         ASSERT(m_bus);
238         g_signal_handlers_disconnect_by_data(m_bus.get(), this);
239         gst_bus_disable_sync_message_emission(m_bus.get());
240         gst_bus_remove_signal_watch(m_bus.get());
241     }
242
243     if (m_appsrc)
244         g_signal_handlers_disconnect_by_data(m_appsrc.get(), this);
245
246     if (m_demux) {
247 #if !LOG_DISABLED
248         GRefPtr<GstPad> demuxerPad = adoptGRef(gst_element_get_static_pad(m_demux.get(), "sink"));
249         gst_pad_remove_probe(demuxerPad.get(), m_demuxerDataEnteringPadProbeInformation.probeId);
250 #endif
251
252         g_signal_handlers_disconnect_by_data(m_demux.get(), this);
253     }
254
255     if (m_appsink) {
256         GRefPtr<GstPad> appsinkPad = adoptGRef(gst_element_get_static_pad(m_appsink.get(), "sink"));
257         g_signal_handlers_disconnect_by_data(appsinkPad.get(), this);
258         g_signal_handlers_disconnect_by_data(m_appsink.get(), this);
259
260 #if !LOG_DISABLED
261         gst_pad_remove_probe(appsinkPad.get(), m_appsinkDataEnteringPadProbeInformation.probeId);
262 #endif
263
264 #if ENABLE(ENCRYPTED_MEDIA)
265         gst_pad_remove_probe(appsinkPad.get(), m_appsinkPadEventProbeInformation.probeId);
266 #endif
267     }
268
269     // We can tear down the pipeline safely now.
270     if (m_pipeline)
271         gst_element_set_state(m_pipeline.get(), GST_STATE_NULL);
272 }
273
274 void AppendPipeline::handleErrorSyncMessage(GstMessage* message)
275 {
276     ASSERT(!isMainThread());
277     GST_WARNING_OBJECT(m_pipeline.get(), "Demuxing error: %" GST_PTR_FORMAT, message);
278     // Notify the main thread that the append has a decode error.
279     auto response = m_taskQueue.enqueueTaskAndWait<AbortableTaskQueue::Void>([this]() {
280         m_errorReceived = true;
281         // appendParsingFailed() will cause resetParserState() to be called.
282         m_sourceBufferPrivate->appendParsingFailed();
283         return AbortableTaskQueue::Void();
284     });
285     // The streaming thread has now been unblocked because we are aborting in the main thread.
286     ASSERT(!response);
287 }
288
289 GstPadProbeReturn AppendPipeline::appsrcEndOfAppendCheckerProbe(GstPadProbeInfo* padProbeInfo)
290 {
291     ASSERT(!isMainThread());
292     m_streamingThread = &WTF::Thread::current();
293
294     GstBuffer* buffer = GST_BUFFER(padProbeInfo->data);
295     ASSERT(GST_IS_BUFFER(buffer));
296
297     GST_TRACE_OBJECT(m_pipeline.get(), "Buffer entered appsrcEndOfAppendCheckerProbe: %" GST_PTR_FORMAT, buffer);
298
299     EndOfAppendMeta* endOfAppendMeta = reinterpret_cast<EndOfAppendMeta*>(gst_buffer_get_meta(buffer, s_endOfAppendMetaType));
300     if (!endOfAppendMeta) {
301         // Normal buffer, nothing to do.
302         return GST_PAD_PROBE_OK;
303     }
304
305     GST_TRACE_OBJECT(m_pipeline.get(), "Posting end-of-append task to the main thread");
306     m_taskQueue.enqueueTask([this]() {
307         handleEndOfAppend();
308     });
309     return GST_PAD_PROBE_DROP;
310 }
311
312 void AppendPipeline::handleNeedContextSyncMessage(GstMessage* message)
313 {
314     const gchar* contextType = nullptr;
315     gst_message_parse_context_type(message, &contextType);
316     GST_TRACE("context type: %s", contextType);
317
318     // MediaPlayerPrivateGStreamerBase will take care of setting up encryption.
319     m_playerPrivate->handleSyncMessage(message);
320 }
321
322 void AppendPipeline::handleStateChangeMessage(GstMessage* message)
323 {
324     ASSERT(isMainThread());
325
326     if (GST_MESSAGE_SRC(message) == reinterpret_cast<GstObject*>(m_pipeline.get())) {
327         GstState currentState, newState;
328         gst_message_parse_state_changed(message, &currentState, &newState, nullptr);
329         CString sourceBufferType = String(m_sourceBufferPrivate->type().raw())
330             .replace("/", "_").replace(" ", "_")
331             .replace("\"", "").replace("\'", "").utf8();
332         CString dotFileName = String::format("webkit-append-%s-%s_%s",
333             sourceBufferType.data(),
334             gst_element_state_get_name(currentState),
335             gst_element_state_get_name(newState)).utf8();
336         GST_DEBUG_BIN_TO_DOT_FILE_WITH_TS(GST_BIN(m_pipeline.get()), GST_DEBUG_GRAPH_SHOW_ALL, dotFileName.data());
337     }
338 }
339
340 gint AppendPipeline::id()
341 {
342     ASSERT(isMainThread());
343
344     if (m_id)
345         return m_id;
346
347     static gint s_totalAudio = 0;
348     static gint s_totalVideo = 0;
349     static gint s_totalText = 0;
350
351     switch (m_streamType) {
352     case Audio:
353         m_id = ++s_totalAudio;
354         break;
355     case Video:
356         m_id = ++s_totalVideo;
357         break;
358     case Text:
359         m_id = ++s_totalText;
360         break;
361     case Unknown:
362     case Invalid:
363         GST_ERROR("Trying to get id for a pipeline of Unknown/Invalid type");
364         ASSERT_NOT_REACHED();
365         break;
366     }
367
368     GST_DEBUG("streamType=%d, id=%d", static_cast<int>(m_streamType), m_id);
369
370     return m_id;
371 }
372
373 void AppendPipeline::parseDemuxerSrcPadCaps(GstCaps* demuxerSrcPadCaps)
374 {
375     ASSERT(isMainThread());
376
377     m_demuxerSrcPadCaps = adoptGRef(demuxerSrcPadCaps);
378     m_streamType = WebCore::MediaSourceStreamTypeGStreamer::Unknown;
379
380     const char* originalMediaType = capsMediaType(m_demuxerSrcPadCaps.get());
381     if (!MediaPlayerPrivateGStreamerMSE::supportsCodec(originalMediaType)) {
382             m_presentationSize = WebCore::FloatSize();
383             m_streamType = WebCore::MediaSourceStreamTypeGStreamer::Invalid;
384     } else if (doCapsHaveType(m_demuxerSrcPadCaps.get(), GST_VIDEO_CAPS_TYPE_PREFIX)) {
385         Optional<FloatSize> size = getVideoResolutionFromCaps(m_demuxerSrcPadCaps.get());
386         if (size.hasValue())
387             m_presentationSize = size.value();
388         else
389             m_presentationSize = WebCore::FloatSize();
390
391         m_streamType = WebCore::MediaSourceStreamTypeGStreamer::Video;
392     } else {
393         m_presentationSize = WebCore::FloatSize();
394         if (doCapsHaveType(m_demuxerSrcPadCaps.get(), GST_AUDIO_CAPS_TYPE_PREFIX))
395             m_streamType = WebCore::MediaSourceStreamTypeGStreamer::Audio;
396         else if (doCapsHaveType(m_demuxerSrcPadCaps.get(), GST_TEXT_CAPS_TYPE_PREFIX))
397             m_streamType = WebCore::MediaSourceStreamTypeGStreamer::Text;
398     }
399 }
400
401 void AppendPipeline::appsinkCapsChanged()
402 {
403     ASSERT(isMainThread());
404
405     GRefPtr<GstPad> pad = adoptGRef(gst_element_get_static_pad(m_appsink.get(), "sink"));
406     GRefPtr<GstCaps> caps = adoptGRef(gst_pad_get_current_caps(pad.get()));
407
408     if (!caps)
409         return;
410
411     // This means that we're right after a new track has appeared. Otherwise, it's a caps change inside the same track.
412     bool previousCapsWereNull = !m_appsinkCaps;
413
414     if (m_appsinkCaps != caps) {
415         m_appsinkCaps = WTFMove(caps);
416         m_playerPrivate->trackDetected(this, m_track, previousCapsWereNull);
417     }
418 }
419
420 void AppendPipeline::handleEndOfAppend()
421 {
422     ASSERT(isMainThread());
423     consumeAppsinkAvailableSamples();
424     GST_TRACE_OBJECT(m_pipeline.get(), "Notifying SourceBufferPrivate the append is complete");
425     sourceBufferPrivate()->didReceiveAllPendingSamples();
426 }
427
428 void AppendPipeline::appsinkNewSample(GRefPtr<GstSample>&& sample)
429 {
430     ASSERT(isMainThread());
431
432     if (UNLIKELY(!gst_sample_get_buffer(sample.get()))) {
433         GST_WARNING("Received sample without buffer from appsink.");
434         return;
435     }
436
437     RefPtr<MediaSampleGStreamer> mediaSample = WebCore::MediaSampleGStreamer::create(WTFMove(sample), m_presentationSize, trackId());
438
439     GST_TRACE("append: trackId=%s PTS=%s DTS=%s DUR=%s presentationSize=%.0fx%.0f",
440         mediaSample->trackID().string().utf8().data(),
441         mediaSample->presentationTime().toString().utf8().data(),
442         mediaSample->decodeTime().toString().utf8().data(),
443         mediaSample->duration().toString().utf8().data(),
444         mediaSample->presentationSize().width(), mediaSample->presentationSize().height());
445
446     // If we're beyond the duration, ignore this sample.
447     MediaTime duration = m_mediaSourceClient->duration();
448     if (duration.isValid() && !duration.indefiniteTime() && mediaSample->presentationTime() > duration) {
449         GST_DEBUG_OBJECT(m_pipeline.get(), "Detected sample (%s) beyond the duration (%s), discarding", mediaSample->presentationTime().toString().utf8().data(), duration.toString().utf8().data());
450         return;
451     }
452
453     // Add a gap sample if a gap is detected before the first sample.
454     if (mediaSample->decodeTime() == MediaTime::zeroTime() && mediaSample->presentationTime() > MediaTime::zeroTime() && mediaSample->presentationTime() <= MediaTime(1, 10)) {
455         GST_DEBUG("Adding gap offset");
456         mediaSample->applyPtsOffset(MediaTime::zeroTime());
457     }
458
459     m_sourceBufferPrivate->didReceiveSample(*mediaSample);
460 }
461
462 void AppendPipeline::didReceiveInitializationSegment()
463 {
464     ASSERT(isMainThread());
465
466     WebCore::SourceBufferPrivateClient::InitializationSegment initializationSegment;
467
468     GST_DEBUG("Notifying SourceBuffer for track %s", (m_track) ? m_track->id().string().utf8().data() : nullptr);
469     initializationSegment.duration = m_mediaSourceClient->duration();
470
471     switch (m_streamType) {
472     case Audio: {
473         WebCore::SourceBufferPrivateClient::InitializationSegment::AudioTrackInformation info;
474         info.track = static_cast<AudioTrackPrivateGStreamer*>(m_track.get());
475         info.description = WebCore::GStreamerMediaDescription::create(m_demuxerSrcPadCaps.get());
476         initializationSegment.audioTracks.append(info);
477         break;
478     }
479     case Video: {
480         WebCore::SourceBufferPrivateClient::InitializationSegment::VideoTrackInformation info;
481         info.track = static_cast<VideoTrackPrivateGStreamer*>(m_track.get());
482         info.description = WebCore::GStreamerMediaDescription::create(m_demuxerSrcPadCaps.get());
483         initializationSegment.videoTracks.append(info);
484         break;
485     }
486     default:
487         GST_ERROR("Unsupported stream type or codec");
488         break;
489     }
490
491     m_sourceBufferPrivate->didReceiveInitializationSegment(initializationSegment);
492 }
493
494 AtomicString AppendPipeline::trackId()
495 {
496     ASSERT(isMainThread());
497
498     if (!m_track)
499         return AtomicString();
500
501     return m_track->id();
502 }
503
504 void AppendPipeline::consumeAppsinkAvailableSamples()
505 {
506     ASSERT(isMainThread());
507
508     GRefPtr<GstSample> sample;
509     int batchedSampleCount = 0;
510     while ((sample = adoptGRef(gst_app_sink_try_pull_sample(GST_APP_SINK(m_appsink.get()), 0)))) {
511         appsinkNewSample(WTFMove(sample));
512         batchedSampleCount++;
513     }
514
515     GST_TRACE_OBJECT(m_pipeline.get(), "batchedSampleCount = %d", batchedSampleCount);
516 }
517
518 void AppendPipeline::resetParserState()
519 {
520     ASSERT(isMainThread());
521     GST_DEBUG_OBJECT(m_pipeline.get(), "Handling resetParserState() in AppendPipeline by resetting the pipeline");
522
523     // FIXME: Implement a flush event-based resetParserState() implementation would allow the initialization segment to
524     // survive, in accordance with the spec.
525
526     // This function restores the GStreamer pipeline to the same state it was when the AppendPipeline constructor
527     // finished. All previously enqueued data is lost and the demuxer is reset, losing all pads and track data.
528
529     // Unlock the streaming thread.
530     m_taskQueue.startAborting();
531
532     // Reset the state of all elements in the pipeline.
533     assertedElementSetState(m_pipeline.get(), GST_STATE_READY);
534
535     // The parser is tear down automatically when the demuxer is reset (see disconnectDemuxerSrcPadFromAppsinkFromAnyThread()).
536     ASSERT(!m_parser);
537
538     // Set the pipeline to PLAYING so that it can be used again.
539     assertedElementSetState(m_pipeline.get(), GST_STATE_PLAYING);
540
541     // All processing related to the previous append has been aborted and the pipeline is idle.
542     // We can listen again to new requests coming from the streaming thread.
543     m_taskQueue.finishAborting();
544
545 #if (!(LOG_DISABLED || defined(GST_DISABLE_GST_DEBUG)))
546     {
547         static unsigned i = 0;
548         // This is here for debugging purposes. It does not make sense to have it as class member.
549         WTF::String dotFileName = String::format("reset-pipeline-%d", ++i);
550         gst_debug_bin_to_dot_file(GST_BIN(m_pipeline.get()), GST_DEBUG_GRAPH_SHOW_ALL, dotFileName.utf8().data());
551     }
552 #endif
553 }
554
555 GstFlowReturn AppendPipeline::pushNewBuffer(GstBuffer* buffer)
556 {
557     GST_TRACE_OBJECT(m_pipeline.get(), "pushing data buffer %" GST_PTR_FORMAT, buffer);
558     GstFlowReturn pushDataBufferRet = gst_app_src_push_buffer(GST_APP_SRC(m_appsrc.get()), buffer);
559     // Pushing buffers to appsrc can only fail if the appsrc is flushing, in EOS or stopped. Neither of these should
560     // be true at this point.
561     g_return_val_if_fail(pushDataBufferRet == GST_FLOW_OK, GST_FLOW_ERROR);
562
563     // Push an additional empty buffer that marks the end of the append.
564     // This buffer is detected and consumed by appsrcEndOfAppendCheckerProbe(), which uses it to signal the successful
565     // completion of the append.
566     //
567     // This works based on how push mode scheduling works in GStreamer. Note there is a single streaming thread for the
568     // AppendPipeline, and within a stream (the portion of a pipeline covered by the same streaming thread, in this case
569     // the whole pipeline) a buffer is guaranteed not to be processed by downstream until processing of the previous
570     // buffer has completed.
571
572     GstBuffer* endOfAppendBuffer = gst_buffer_new();
573     gst_buffer_add_meta(endOfAppendBuffer, s_webKitEndOfAppendMetaInfo, nullptr);
574
575     GST_TRACE_OBJECT(m_pipeline.get(), "pushing end-of-append buffer %" GST_PTR_FORMAT, endOfAppendBuffer);
576     GstFlowReturn pushEndOfAppendBufferRet = gst_app_src_push_buffer(GST_APP_SRC(m_appsrc.get()), endOfAppendBuffer);
577     g_return_val_if_fail(pushEndOfAppendBufferRet == GST_FLOW_OK, GST_FLOW_ERROR);
578
579     return GST_FLOW_OK;
580 }
581
582 GstFlowReturn AppendPipeline::handleAppsinkNewSampleFromStreamingThread(GstElement*)
583 {
584     ASSERT(!isMainThread());
585     if (&WTF::Thread::current() != m_streamingThread) {
586         // m_streamingThreadId has been initialized in appsrcEndOfAppendCheckerProbe().
587         // For a buffer to reach the appsink, a buffer must have passed through appsrcEndOfAppendCheckerProbe() first.
588         // This error will only raise if someone modifies the pipeline to include more than one streaming thread or
589         // removes the appsrcEndOfAppendCheckerProbe(). Either way, the end-of-append detection would be broken.
590         // AppendPipeline should have only one streaming thread. Otherwise we can't detect reliably when an appends has
591         // been demuxed completely.;
592         GST_ERROR_OBJECT(m_pipeline.get(), "Appsink received a sample in a different thread than appsrcEndOfAppendCheckerProbe run.");
593         ASSERT_NOT_REACHED();
594     }
595
596     if (!m_wasBusAlreadyNotifiedOfAvailableSamples.test_and_set()) {
597         GST_TRACE("Posting appsink-new-sample task to the main thread");
598         m_taskQueue.enqueueTask([this]() {
599             m_wasBusAlreadyNotifiedOfAvailableSamples.clear();
600             consumeAppsinkAvailableSamples();
601         });
602     }
603
604     return GST_FLOW_OK;
605 }
606
607 static GRefPtr<GstElement>
608 createOptionalParserForFormat(GstPad* demuxerSrcPad)
609 {
610     GRefPtr<GstCaps> padCaps = adoptGRef(gst_pad_get_current_caps(demuxerSrcPad));
611     GstStructure* structure = gst_caps_get_structure(padCaps.get(), 0);
612     const char* mediaType = gst_structure_get_name(structure);
613
614     GUniquePtr<char> demuxerPadName(gst_pad_get_name(demuxerSrcPad));
615     GUniquePtr<char> parserName(g_strdup_printf("%s_parser", demuxerPadName.get()));
616
617     if (!g_strcmp0(mediaType, "audio/x-opus")) {
618         GstElement* opusparse = gst_element_factory_make("opusparse", parserName.get());
619         ASSERT(opusparse);
620         g_return_val_if_fail(opusparse, nullptr);
621         return GRefPtr<GstElement>(opusparse);
622     }
623     if (!g_strcmp0(mediaType, "audio/x-vorbis")) {
624         GstElement* vorbisparse = gst_element_factory_make("vorbisparse", parserName.get());
625         ASSERT(vorbisparse);
626         g_return_val_if_fail(vorbisparse, nullptr);
627         return GRefPtr<GstElement>(vorbisparse);
628     }
629     if (!g_strcmp0(mediaType, "video/x-h264")) {
630         GstElement* h264parse = gst_element_factory_make("h264parse", parserName.get());
631         ASSERT(h264parse);
632         g_return_val_if_fail(h264parse, nullptr);
633         return GRefPtr<GstElement>(h264parse);
634     }
635
636     return nullptr;
637 }
638
639 void AppendPipeline::connectDemuxerSrcPadToAppsinkFromStreamingThread(GstPad* demuxerSrcPad)
640 {
641     ASSERT(!isMainThread());
642
643     GST_DEBUG("connecting to appsink");
644
645     if (m_demux->numsrcpads > 1) {
646         GST_WARNING("Only one stream per SourceBuffer is allowed! Ignoring stream %d by adding a black hole probe.", m_demux->numsrcpads);
647         gulong probeId = gst_pad_add_probe(demuxerSrcPad, GST_PAD_PROBE_TYPE_BUFFER, reinterpret_cast<GstPadProbeCallback>(appendPipelineDemuxerBlackHolePadProbe), nullptr, nullptr);
648         g_object_set_data(G_OBJECT(demuxerSrcPad), "blackHoleProbeId", GULONG_TO_POINTER(probeId));
649         return;
650     }
651
652     GRefPtr<GstPad> appsinkSinkPad = adoptGRef(gst_element_get_static_pad(m_appsink.get(), "sink"));
653
654     // Only one stream per demuxer is supported.
655     ASSERT(!gst_pad_is_linked(appsinkSinkPad.get()));
656
657     gint64 timeLength = 0;
658     if (gst_element_query_duration(m_demux.get(), GST_FORMAT_TIME, &timeLength)
659         && static_cast<guint64>(timeLength) != GST_CLOCK_TIME_NONE)
660         m_initialDuration = MediaTime(GST_TIME_AS_USECONDS(timeLength), G_USEC_PER_SEC);
661     else
662         m_initialDuration = MediaTime::positiveInfiniteTime();
663
664     GST_DEBUG("Requesting demuxer-connect-to-appsink to main thread");
665     auto response = m_taskQueue.enqueueTaskAndWait<AbortableTaskQueue::Void>([this, demuxerSrcPad]() {
666         connectDemuxerSrcPadToAppsink(demuxerSrcPad);
667         return AbortableTaskQueue::Void();
668     });
669     if (!response) {
670         // The AppendPipeline has been destroyed or aborted before we received a response.
671         return;
672     }
673
674     // Must be done in the thread we were called from (usually streaming thread).
675     bool isData = (m_streamType == WebCore::MediaSourceStreamTypeGStreamer::Audio)
676         || (m_streamType == WebCore::MediaSourceStreamTypeGStreamer::Video)
677         || (m_streamType == WebCore::MediaSourceStreamTypeGStreamer::Text);
678
679     if (isData) {
680         GRefPtr<GstObject> parent = adoptGRef(gst_element_get_parent(m_appsink.get()));
681         if (!parent)
682             gst_bin_add(GST_BIN(m_pipeline.get()), m_appsink.get());
683
684         // Current head of the pipeline being built.
685         GRefPtr<GstPad> currentSrcPad = demuxerSrcPad;
686
687         // Some audio files unhelpfully omit the duration of frames in the container. We need to parse
688         // the contained audio streams in order to know the duration of the frames.
689         // This is known to be an issue with YouTube WebM files containing Opus audio as of YTTV2018.
690         m_parser = createOptionalParserForFormat(currentSrcPad.get());
691         if (m_parser) {
692             gst_bin_add(GST_BIN(m_pipeline.get()), m_parser.get());
693             gst_element_sync_state_with_parent(m_parser.get());
694
695             GRefPtr<GstPad> parserSinkPad = adoptGRef(gst_element_get_static_pad(m_parser.get(), "sink"));
696             GRefPtr<GstPad> parserSrcPad = adoptGRef(gst_element_get_static_pad(m_parser.get(), "src"));
697
698             gst_pad_link(currentSrcPad.get(), parserSinkPad.get());
699             currentSrcPad = parserSrcPad;
700         }
701
702         gst_pad_link(currentSrcPad.get(), appsinkSinkPad.get());
703
704         gst_element_sync_state_with_parent(m_appsink.get());
705
706         GST_DEBUG_BIN_TO_DOT_FILE_WITH_TS(GST_BIN(m_pipeline.get()), GST_DEBUG_GRAPH_SHOW_ALL, "webkit-after-link");
707     }
708 }
709
710 void AppendPipeline::connectDemuxerSrcPadToAppsink(GstPad* demuxerSrcPad)
711 {
712     ASSERT(isMainThread());
713     GST_DEBUG("Connecting to appsink");
714
715     const String& type = m_sourceBufferPrivate->type().containerType();
716     if (type.endsWith("webm"))
717         gst_pad_add_probe(demuxerSrcPad, GST_PAD_PROBE_TYPE_EVENT_DOWNSTREAM, matroskademuxForceSegmentStartToEqualZero, nullptr, nullptr);
718
719     GRefPtr<GstPad> sinkSinkPad = adoptGRef(gst_element_get_static_pad(m_appsink.get(), "sink"));
720
721     // Only one stream per demuxer is supported.
722     ASSERT(!gst_pad_is_linked(sinkSinkPad.get()));
723
724     GRefPtr<GstCaps> caps = adoptGRef(gst_pad_get_current_caps(GST_PAD(demuxerSrcPad)));
725
726 #ifndef GST_DISABLE_GST_DEBUG
727     {
728         GUniquePtr<gchar> strcaps(gst_caps_to_string(caps.get()));
729         GST_DEBUG("%s", strcaps.get());
730     }
731 #endif
732
733     if (m_mediaSourceClient->duration().isInvalid() && m_initialDuration > MediaTime::zeroTime())
734         m_mediaSourceClient->durationChanged(m_initialDuration);
735
736     parseDemuxerSrcPadCaps(gst_caps_ref(caps.get()));
737
738     switch (m_streamType) {
739     case WebCore::MediaSourceStreamTypeGStreamer::Audio:
740         m_track = WebCore::AudioTrackPrivateGStreamer::create(makeWeakPtr(*m_playerPrivate), id(), sinkSinkPad.get());
741         break;
742     case WebCore::MediaSourceStreamTypeGStreamer::Video:
743         m_track = WebCore::VideoTrackPrivateGStreamer::create(makeWeakPtr(*m_playerPrivate), id(), sinkSinkPad.get());
744         break;
745     case WebCore::MediaSourceStreamTypeGStreamer::Text:
746         m_track = WebCore::InbandTextTrackPrivateGStreamer::create(id(), sinkSinkPad.get());
747         break;
748     case WebCore::MediaSourceStreamTypeGStreamer::Invalid:
749         GST_WARNING_OBJECT(m_pipeline.get(), "Unsupported track codec: %" GST_PTR_FORMAT, caps.get());
750         // 3.5.7 Initialization Segment Received
751         // 5.1. If the initialization segment contains tracks with codecs the user agent does not support, then run the
752         // append error algorithm and abort these steps.
753
754         // appendParsingFailed() will immediately cause a resetParserState() which will stop demuxing, then the
755         // AppendPipeline will be destroyed.
756         m_sourceBufferPrivate->appendParsingFailed();
757         return;
758     default:
759         GST_WARNING_OBJECT(m_pipeline.get(), "Pad has unknown track type, ignoring: %" GST_PTR_FORMAT, caps.get());
760         break;
761     }
762
763     m_appsinkCaps = WTFMove(caps);
764     m_playerPrivate->trackDetected(this, m_track, true);
765 }
766
767 void AppendPipeline::disconnectDemuxerSrcPadFromAppsinkFromAnyThread(GstPad*)
768 {
769     // Note: This function can be called either from the streaming thread (e.g. if a strange initialization segment with
770     // incompatible tracks is appended and the srcpad disconnected) or -- more usually -- from the main thread, when
771     // a state change is made to bring the demuxer down. (State change operations run in the main thread.)
772     GST_DEBUG_BIN_TO_DOT_FILE_WITH_TS(GST_BIN(m_pipeline.get()), GST_DEBUG_GRAPH_SHOW_ALL, "pad-removed-before");
773
774     GST_DEBUG("Disconnecting appsink");
775
776     if (m_parser) {
777         assertedElementSetState(m_parser.get(), GST_STATE_NULL);
778         gst_bin_remove(GST_BIN(m_pipeline.get()), m_parser.get());
779         m_parser = nullptr;
780     }
781
782     GST_DEBUG_BIN_TO_DOT_FILE_WITH_TS(GST_BIN(m_pipeline.get()), GST_DEBUG_GRAPH_SHOW_ALL, "pad-removed-after");
783 }
784
785 #if !LOG_DISABLED
786 static GstPadProbeReturn appendPipelinePadProbeDebugInformation(GstPad*, GstPadProbeInfo* info, struct PadProbeInformation* padProbeInformation)
787 {
788     ASSERT(GST_PAD_PROBE_INFO_TYPE(info) & GST_PAD_PROBE_TYPE_BUFFER);
789     GstBuffer* buffer = GST_PAD_PROBE_INFO_BUFFER(info);
790     GST_TRACE("%s: buffer of size %" G_GSIZE_FORMAT " going thru", padProbeInformation->description, gst_buffer_get_size(buffer));
791     return GST_PAD_PROBE_OK;
792 }
793 #endif
794
795 #if ENABLE(ENCRYPTED_MEDIA)
796 static GstPadProbeReturn appendPipelineAppsinkPadEventProbe(GstPad*, GstPadProbeInfo* info, struct PadProbeInformation *padProbeInformation)
797 {
798     ASSERT(GST_PAD_PROBE_INFO_TYPE(info) & GST_PAD_PROBE_TYPE_EVENT_DOWNSTREAM);
799     GstEvent* event = gst_pad_probe_info_get_event(info);
800     GST_DEBUG("Handling event %s on append pipeline appsinkPad", GST_EVENT_TYPE_NAME(event));
801     WebCore::AppendPipeline* appendPipeline = padProbeInformation->appendPipeline;
802
803     switch (GST_EVENT_TYPE(event)) {
804     case GST_EVENT_PROTECTION:
805         if (appendPipeline && appendPipeline->playerPrivate())
806             appendPipeline->playerPrivate()->handleProtectionEvent(event);
807         return GST_PAD_PROBE_DROP;
808     default:
809         break;
810     }
811
812     return GST_PAD_PROBE_OK;
813 }
814 #endif
815
816 static GstPadProbeReturn appendPipelineDemuxerBlackHolePadProbe(GstPad*, GstPadProbeInfo* info, gpointer)
817 {
818     ASSERT(GST_PAD_PROBE_INFO_TYPE(info) & GST_PAD_PROBE_TYPE_BUFFER);
819     GstBuffer* buffer = GST_PAD_PROBE_INFO_BUFFER(info);
820     GST_TRACE("buffer of size %" G_GSIZE_FORMAT " ignored", gst_buffer_get_size(buffer));
821     return GST_PAD_PROBE_DROP;
822 }
823
824 static GstPadProbeReturn matroskademuxForceSegmentStartToEqualZero(GstPad*, GstPadProbeInfo* info, void*)
825 {
826     // matroskademux sets GstSegment.start to the PTS of the first frame.
827     //
828     // This way in the unlikely case a user made a .mkv or .webm file where a certain portion of the movie is skipped
829     // (e.g. by concatenating a MSE initialization segment with any MSE media segment other than the first) and opened
830     // it with a regular player, playback would start immediately. GstSegment.duration is not modified in any case.
831     //
832     // Leaving the usefulness of that feature aside, the fact that it uses GstSegment.start is problematic for MSE.
833     // In MSE is not unusual to process unordered MSE media segments. In this case, a frame may have
834     // PTS <<< GstSegment.start and be discarded by downstream. This happens for instance in elements derived from
835     // audiobasefilter, such as opusparse.
836     //
837     // This probe remedies the problem by setting GstSegment.start to 0 in all cases, not only when the PTS of the first
838     // frame is zero.
839
840     ASSERT(info->type & GST_PAD_PROBE_TYPE_EVENT_DOWNSTREAM);
841     GstEvent* event = static_cast<GstEvent*>(info->data);
842     if (event->type == GST_EVENT_SEGMENT) {
843         GstSegment segment;
844         gst_event_copy_segment(event, &segment);
845
846         segment.start = 0;
847
848         GRefPtr<GstEvent> newEvent = adoptGRef(gst_event_new_segment(&segment));
849         gst_event_replace(reinterpret_cast<GstEvent**>(&info->data), newEvent.get());
850     }
851     return GST_PAD_PROBE_OK;
852 }
853
854 } // namespace WebCore.
855
856 #endif // USE(GSTREAMER)